[GitHub] carbondata pull request #1605: [WIP] added support to compact segments in pr...

classic Classic list List threaded Threaded
85 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154970020
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    --- End diff --
   
    correct the coding style in this function, please follow this coding style in future:
    ```
         val a = foo(
            paramA,
            paramB)
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154970799
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    --- End diff --
   
    What does this do?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154971443
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        .addPreAggLoadFunction(PreAggregateUtil
    +          .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad")
    +      try {
    +        CarbonLoadDataCommand(Some(carbonTable.getDatabaseName),
    --- End diff --
   
    When invoking command, follow this code style:
    ```
        CarbonXXXCommand(
           paramA = valueA,
           paramB = valueB,
           ...
        ).run(sparkSession)
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154972638
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -130,6 +131,9 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setFactFilePath(factPath)
           carbonLoadModel.setAggLoadRequest(internalOptions
               .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
    +      carbonLoadModel
    --- End diff --
   
    following the coding style.
    ```
         a.foo(
           paramA,
           paramB)
    ```



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154972340
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        .addPreAggLoadFunction(PreAggregateUtil
    +          .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad")
    +      try {
    +        CarbonLoadDataCommand(Some(carbonTable.getDatabaseName),
    +          carbonTable.getTableName,
    +          null,
    +          Nil,
    +          Map("fileheader" -> headers),
    +          isOverwriteTable = false,
    +          dataFrame = Some(childDataFrame),
    +          internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
    +            "compactionType" -> carbonLoadModel.getCompactionType.toString)).run(sparkSession)
    +      } finally {
    +        // check if any other segments needs compaction on in case of MINOR_COMPACTION.
    +        // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
    +        // allows it.
    +        if (!carbonLoadModel.getCompactionType.equals(CompactionType.MAJOR)) {
    +          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
    +          startCompactionForDataMap(carbonLoadModel, sparkSession)
    --- End diff --
   
    We should avoid the recursive call, you can invoke it in caller of this function.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154974553
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener {
           }
         }
       }
    +
    +  /**
    +   * mark the merged segments as COMPACTED and write load details into table status.
    +   *
    +   * @param carbonLoadModel
    +   */
    +  private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = {
    +    val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator()
    +    while(loadMetadataDetailsIterator.hasNext) {
    +      val loadMetaDataDetail = loadMetadataDetailsIterator.next()
    +      if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) {
    +        loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED)
    +      }
    +    }
    +    val carbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        .getAbsoluteTableIdentifier)
    +    SegmentStatusManager
    +      .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
    +        carbonLoadModel.getLoadMetadataDetails
    +          .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size)))
    +  }
    +
    +}
    +
    +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    --- End diff --
   
    What does the listener do? please add comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154974612
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener {
           }
         }
       }
    +
    +  /**
    +   * mark the merged segments as COMPACTED and write load details into table status.
    +   *
    +   * @param carbonLoadModel
    +   */
    +  private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = {
    +    val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator()
    +    while(loadMetadataDetailsIterator.hasNext) {
    +      val loadMetaDataDetail = loadMetadataDetailsIterator.next()
    +      if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) {
    +        loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED)
    +      }
    +    }
    +    val carbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        .getAbsoluteTableIdentifier)
    +    SegmentStatusManager
    +      .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
    +        carbonLoadModel.getLoadMetadataDetails
    +          .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size)))
    +  }
    +
    +}
    +
    +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener {
    --- End diff --
   
    It is listener, not event


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154974766
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener {
           }
         }
       }
    +
    +  /**
    +   * mark the merged segments as COMPACTED and write load details into table status.
    +   *
    +   * @param carbonLoadModel
    +   */
    +  private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = {
    +    val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator()
    +    while(loadMetadataDetailsIterator.hasNext) {
    +      val loadMetaDataDetail = loadMetadataDetailsIterator.next()
    +      if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) {
    +        loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED)
    +      }
    +    }
    +    val carbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        .getAbsoluteTableIdentifier)
    +    SegmentStatusManager
    +      .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
    +        carbonLoadModel.getLoadMetadataDetails
    +          .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size)))
    +  }
    +
    +}
    +
    +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
    +    val carbonTable = compactionEvent.carbonTable
    +    val compactionType = compactionEvent.carbonMergerMapping.campactionType
    +    val sparkSession = compactionEvent.sQLContext.sparkSession
    +    if (carbonTable.hasDataMapSchema) {
    +      for (dataMapSchema: DataMapSchema <- carbonTable.getTableInfo.getDataMapSchemaList
    --- End diff --
   
    use `carbonTable.getTableInfo.getDataMapSchemaList.foreach` instead


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154975193
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +493,20 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  def createChildSelectQuery(tableSchema: TableSchema): String = {
    --- End diff --
   
    Instead of creating a SQL string, you can construct a dataframe directly and return to startCompactionForDataMap


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154975703
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---
    @@ -172,6 +173,16 @@
     
       private boolean isAggLoadRequest;
     
    +  private CompactionType compactionType = CompactionType.NONE;
    --- End diff --
   
    It is strange to have a compactionType in load model, why not put it in compaction model


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154978691
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---
    @@ -133,25 +133,25 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
     /**
      *
      * @param carbonTable
    - * @param carbonLoadModel
    + * @param carbonMergerMapping
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154978738
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---
    @@ -133,25 +133,25 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
     /**
      *
      * @param carbonTable
    - * @param carbonLoadModel
    + * @param carbonMergerMapping
      * @param mergedLoadName
      * @param sQLContext
      */
    -case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
    -    carbonLoadModel: CarbonLoadModel,
    +case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
    --- End diff --
   
    ok



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154979724
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala ---
    @@ -42,6 +42,14 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
         carbonTableIdentifier: CarbonTableIdentifier,
         carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
     
    +/**
    + * Class for handling operations after data load completion and before final commit of load
    + * operation. Example usage: For loading pre-aggregate tables
    + */
    +case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
    --- End diff --
   
    This event will be used to perform some task just before updating the carbontable status.
    For example: When loading data into parent table we need to start load for all child datamaps  so that if any of the load fails then parent table status file would not be written.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154979756
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---
    @@ -69,11 +69,7 @@ object Compactor {
         // trigger event for compaction
         val operationContext = new OperationContext
         val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
    -      AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession,
    -        carbonTable,
    -        carbonLoadModel,
    -        mergedLoadName,
    -        sc)
    +      AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154980064
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -492,6 +492,10 @@ object CarbonDataRDDFactory {
             throw new Exception("No Data to load")
           }
           writeDictionary(carbonLoadModel, result, writeAll = false)
    +      val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession,
    --- End diff --
   
    Used to do some operation before commiting table status of parent


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154980110
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand(
         // if system level compaction is enabled then only one compaction can run in the system
         // if any other request comes at this time then it will create a compaction request file.
         // so that this will be taken up by the compaction process which is executing.
    -    if (!isConcurrentCompactionAllowed) {
    +    if (carbonTable.isChildDataMap) {
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154980148
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand(
         // if system level compaction is enabled then only one compaction can run in the system
         // if any other request comes at this time then it will create a compaction request file.
         // so that this will be taken up by the compaction process which is executing.
    -    if (!isConcurrentCompactionAllowed) {
    +    if (carbonTable.isChildDataMap) {
    +      carbonLoadModel.setCompactionType(alterTableModel.compactionType.toUpperCase match {
    --- End diff --
   
    done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154980576
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    --- End diff --
   
    This is used to set the segments to access to load incremental data into the child table. There is no other way to set segments in CarbonScanRDD other than this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154980619
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1605: [CARBONDATA-1526] [PreAgg] Added support to c...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1605#discussion_r154982378
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
           }
         }
       }
    +
    +  private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val loadMetaDataDetails = CarbonDataMergerUtil
    +      .identifySegmentsToBeMerged(carbonLoadModel,
    +        CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
    +        carbonLoadModel.getLoadMetadataDetails,
    +        carbonLoadModel.getCompactionType)
    +    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    if (segments.nonEmpty) {
    +      CarbonSession
    +        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                   carbonLoadModel.getDatabaseName + "." +
    +                   carbonLoadModel.getTableName,
    +          segments.mkString(","))
    +      CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                              carbonLoadModel.getDatabaseName + "." +
    +                              carbonLoadModel.getTableName, "false")
    +      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    +        .map(_.getColumnName).mkString(",")
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    --- End diff --
   
    ok


---
12345