Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154970020 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil --- End diff -- correct the coding style in this function, please follow this coding style in future: ``` val a = foo( paramA, paramB) ``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154970799 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() --- End diff -- What does this do? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154971443 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(PreAggregateUtil + .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") + try { + CarbonLoadDataCommand(Some(carbonTable.getDatabaseName), --- End diff -- When invoking command, follow this code style: ``` CarbonXXXCommand( paramA = valueA, paramB = valueB, ... ).run(sparkSession) ``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154972638 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -130,6 +131,9 @@ case class CarbonLoadDataCommand( carbonLoadModel.setFactFilePath(factPath) carbonLoadModel.setAggLoadRequest(internalOptions .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) + carbonLoadModel --- End diff -- following the coding style. ``` a.foo( paramA, paramB) ``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154972340 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(PreAggregateUtil + .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") + try { + CarbonLoadDataCommand(Some(carbonTable.getDatabaseName), + carbonTable.getTableName, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true", + "compactionType" -> carbonLoadModel.getCompactionType.toString)).run(sparkSession) + } finally { + // check if any other segments needs compaction on in case of MINOR_COMPACTION. + // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold + // allows it. + if (!carbonLoadModel.getCompactionType.equals(CompactionType.MAJOR)) { + CommonUtil.readLoadMetadataDetails(carbonLoadModel) + startCompactionForDataMap(carbonLoadModel, sparkSession) --- End diff -- We should avoid the recursive call, you can invoke it in caller of this function. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154974553 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener { } } } + + /** + * mark the merged segments as COMPACTED and write load details into table status. + * + * @param carbonLoadModel + */ + private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = { + val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator() + while(loadMetadataDetailsIterator.hasNext) { + val loadMetaDataDetail = loadMetadataDetailsIterator.next() + if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) { + loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED) + } + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + carbonLoadModel.getLoadMetadataDetails + .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size))) + } + +} + +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { --- End diff -- What does the listener do? please add comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154974612 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener { } } } + + /** + * mark the merged segments as COMPACTED and write load details into table status. + * + * @param carbonLoadModel + */ + private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = { + val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator() + while(loadMetadataDetailsIterator.hasNext) { + val loadMetaDataDetail = loadMetadataDetailsIterator.next() + if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) { + loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED) + } + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + carbonLoadModel.getLoadMetadataDetails + .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size))) + } + +} + +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { --- End diff -- It is listener, not event --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154974766 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener { } } } + + /** + * mark the merged segments as COMPACTED and write load details into table status. + * + * @param carbonLoadModel + */ + private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = { + val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator() + while(loadMetadataDetailsIterator.hasNext) { + val loadMetaDataDetail = loadMetadataDetailsIterator.next() + if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) { + loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED) + } + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + carbonLoadModel.getLoadMetadataDetails + .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size))) + } + +} + +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent] + val carbonTable = compactionEvent.carbonTable + val compactionType = compactionEvent.carbonMergerMapping.campactionType + val sparkSession = compactionEvent.sQLContext.sparkSession + if (carbonTable.hasDataMapSchema) { + for (dataMapSchema: DataMapSchema <- carbonTable.getTableInfo.getDataMapSchemaList --- End diff -- use `carbonTable.getTableInfo.getDataMapSchemaList.foreach` instead --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154975193 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +493,20 @@ object PreAggregateUtil { updatedPlan } + def createChildSelectQuery(tableSchema: TableSchema): String = { --- End diff -- Instead of creating a SQL string, you can construct a dataframe directly and return to startCompactionForDataMap --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154975703 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -172,6 +173,16 @@ private boolean isAggLoadRequest; + private CompactionType compactionType = CompactionType.NONE; --- End diff -- It is strange to have a compactionType in load model, why not put it in compaction model --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154978691 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala --- @@ -133,25 +133,25 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable, /** * * @param carbonTable - * @param carbonLoadModel + * @param carbonMergerMapping --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154978738 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala --- @@ -133,25 +133,25 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable, /** * * @param carbonTable - * @param carbonLoadModel + * @param carbonMergerMapping * @param mergedLoadName * @param sQLContext */ -case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, - carbonLoadModel: CarbonLoadModel, +case class AlterTableCompactionPreEvent(carbonTable: CarbonTable, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154979724 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala --- @@ -42,6 +42,14 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession, carbonTableIdentifier: CarbonTableIdentifier, carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo +/** + * Class for handling operations after data load completion and before final commit of load + * operation. Example usage: For loading pre-aggregate tables + */ +case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession, --- End diff -- This event will be used to perform some task just before updating the carbontable status. For example: When loading data into parent table we need to start load for all child datamaps so that if any of the load fails then parent table status file would not be written. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154979756 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala --- @@ -69,11 +69,7 @@ object Compactor { // trigger event for compaction val operationContext = new OperationContext val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = - AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession, - carbonTable, - carbonLoadModel, - mergedLoadName, - sc) + AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154980064 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -492,6 +492,10 @@ object CarbonDataRDDFactory { throw new Exception("No Data to load") } writeDictionary(carbonLoadModel, result, writeAll = false) + val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession, --- End diff -- Used to do some operation before commiting table status of parent --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154980110 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand( // if system level compaction is enabled then only one compaction can run in the system // if any other request comes at this time then it will create a compaction request file. // so that this will be taken up by the compaction process which is executing. - if (!isConcurrentCompactionAllowed) { + if (carbonTable.isChildDataMap) { --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154980148 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand( // if system level compaction is enabled then only one compaction can run in the system // if any other request comes at this time then it will create a compaction request file. // so that this will be taken up by the compaction process which is executing. - if (!isConcurrentCompactionAllowed) { + if (carbonTable.isChildDataMap) { + carbonLoadModel.setCompactionType(alterTableModel.compactionType.toUpperCase match { --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154980576 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + --- End diff -- This is used to set the segments to access to load incremental data into the child table. There is no other way to set segments in CarbonScanRDD other than this --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154980619 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154982378 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() --- End diff -- ok --- |
Free forum by Nabble | Edit this page |