[GitHub] carbondata pull request #1605: [WIP] added support to compact segments in pr...

classic Classic list List threaded Threaded
85 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154982418
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        .addPreAggLoadFunction(PreAggregateUtil
    +          .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad")
    +      try {
    +        CarbonLoadDataCommand(Some(carbonTable.getDatabaseName),
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154983698
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -130,6 +131,9 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setFactFilePath(factPath)
           carbonLoadModel.setAggLoadRequest(internalOptions
               .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
    +      carbonLoadModel
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154983970
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener {
           }
         }
       }
    +
    +  /**
    +   * mark the merged segments as COMPACTED and write load details into table status.
    +   *
    +   * @param carbonLoadModel
    +   */
    +  private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = {
    +    val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator()
    +    while(loadMetadataDetailsIterator.hasNext) {
    +      val loadMetaDataDetail = loadMetadataDetailsIterator.next()
    +      if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) {
    +        loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED)
    +      }
    +    }
    +    val carbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        .getAbsoluteTableIdentifier)
    +    SegmentStatusManager
    +      .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
    +        carbonLoadModel.getLoadMetadataDetails
    +          .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size)))
    +  }
    +
    +}
    +
    +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener {
    --- End diff --
   
    changed the name


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154984035
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener {
           }
         }
       }
    +
    +  /**
    +   * mark the merged segments as COMPACTED and write load details into table status.
    +   *
    +   * @param carbonLoadModel
    +   */
    +  private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = {
    +    val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator()
    +    while(loadMetadataDetailsIterator.hasNext) {
    +      val loadMetaDataDetail = loadMetadataDetailsIterator.next()
    +      if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) {
    +        loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED)
    +      }
    +    }
    +    val carbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        .getAbsoluteTableIdentifier)
    +    SegmentStatusManager
    +      .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
    +        carbonLoadModel.getLoadMetadataDetails
    +          .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size)))
    +  }
    +
    +}
    +
    +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    --- End diff --
   
    added comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154986219
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---
    @@ -172,6 +173,16 @@
     
       private boolean isAggLoadRequest;
     
    +  private CompactionType compactionType = CompactionType.NONE;
    --- End diff --
   
    CompactionModel is not available in the LoadCommand while LoadModel is available in both compation and loading commands


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154990350
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---
    @@ -172,6 +173,16 @@
     
       private boolean isAggLoadRequest;
     
    +  private CompactionType compactionType = CompactionType.NONE;
    --- End diff --
   
    i dont think for one parameter it is required to create a new compaction model


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154990944
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +493,20 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  def createChildSelectQuery(tableSchema: TableSchema): String = {
    --- End diff --
   
    This sql is created to select data from the child table that will be inserted in that table itself. Load UDF would then be added to it and then a DataFrame would be created.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/476/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/478/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154997789
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---
    @@ -172,6 +173,16 @@
     
       private boolean isAggLoadRequest;
     
    +  private CompactionType compactionType = CompactionType.NONE;
    --- End diff --
   
    This is added only for compaction, but load model is not for compaction. It is better to refactor it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154998197
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +493,20 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  def createChildSelectQuery(tableSchema: TableSchema): String = {
    --- End diff --
   
    I understand what you are doing, but I think there is better way to create the dataframe directly instead of going through parser again.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1740/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2123/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2126/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r155244586
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -156,10 +158,19 @@ case class CarbonAlterTableCompactionCommand(
           )
           .equalsIgnoreCase("true")
     
    -    // if system level compaction is enabled then only one compaction can run in the system
    -    // if any other request comes at this time then it will create a compaction request file.
    -    // so that this will be taken up by the compaction process which is executing.
    -    if (!isConcurrentCompactionAllowed) {
    +    // If the carbon table is a child datamap then start compaction for the same using Load
    --- End diff --
   
    Please add an interface at `DataManagementFunc` and give 2 implementations , one for maintable and another one for agg table. Execute compaction depends on table type


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r155245618
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---
    @@ -172,6 +173,16 @@
     
       private boolean isAggLoadRequest;
     
    +  private CompactionType compactionType = CompactionType.NONE;
    --- End diff --
   
    Please add this compactionType to operationContext, don't add here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r155247755
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -274,8 +291,11 @@ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
             if (loadStartEntry) {
               String segmentId =
                   String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
    -          newMetaEntry.setLoadName(segmentId);
               loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
    +          if (loadModel.getCompactionType() != CompactionType.NONE) {
    --- End diff --
   
    Please update the tablestatus for compaction of agg table after load success.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/526/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/530/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1605: [CARBONDATA-1526] [PreAgg] Added support to compact ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1605
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/531/



---
12345