Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154982418 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand( } } } + + private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val loadMetaDataDetails = CarbonDataMergerUtil + .identifySegmentsToBeMerged(carbonLoadModel, + CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR), + carbonLoadModel.getLoadMetadataDetails, + carbonLoadModel.getCompactionType) + val segments = loadMetaDataDetails.asScala.map(_.getLoadName) + if (segments.nonEmpty) { + CarbonSession + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + segments.mkString(",")) + CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") + val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala + .map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(PreAggregateUtil + .createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") + try { + CarbonLoadDataCommand(Some(carbonTable.getDatabaseName), --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154983698 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -130,6 +131,9 @@ case class CarbonLoadDataCommand( carbonLoadModel.setFactFilePath(factPath) carbonLoadModel.setAggLoadRequest(internalOptions .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) + carbonLoadModel --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154983970 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener { } } } + + /** + * mark the merged segments as COMPACTED and write load details into table status. + * + * @param carbonLoadModel + */ + private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = { + val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator() + while(loadMetadataDetailsIterator.hasNext) { + val loadMetaDataDetail = loadMetadataDetailsIterator.next() + if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) { + loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED) + } + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + carbonLoadModel.getLoadMetadataDetails + .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size))) + } + +} + +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { --- End diff -- changed the name --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154984035 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -78,6 +89,57 @@ object LoadPostAggregateListener extends OperationEventListener { } } } + + /** + * mark the merged segments as COMPACTED and write load details into table status. + * + * @param carbonLoadModel + */ + private def markSegmentsAsCompacted(carbonLoadModel: CarbonLoadModel): Unit = { + val loadMetadataDetailsIterator = carbonLoadModel.getLoadMetadataDetails.iterator() + while(loadMetadataDetailsIterator.hasNext) { + val loadMetaDataDetail = loadMetadataDetailsIterator.next() + if (loadMetaDataDetail.getMergedLoadName == carbonLoadModel.getSegmentId) { + loadMetaDataDetail.setSegmentStatus(SegmentStatus.COMPACTED) + } + } + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + SegmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + carbonLoadModel.getLoadMetadataDetails + .toArray(new Array[LoadMetadataDetails](carbonLoadModel.getLoadMetadataDetails.size))) + } + +} + +object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { --- End diff -- added comment --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154986219 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -172,6 +173,16 @@ private boolean isAggLoadRequest; + private CompactionType compactionType = CompactionType.NONE; --- End diff -- CompactionModel is not available in the LoadCommand while LoadModel is available in both compation and loading commands --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154990350 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -172,6 +173,16 @@ private boolean isAggLoadRequest; + private CompactionType compactionType = CompactionType.NONE; --- End diff -- i dont think for one parameter it is required to create a new compaction model --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154990944 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +493,20 @@ object PreAggregateUtil { updatedPlan } + def createChildSelectQuery(tableSchema: TableSchema): String = { --- End diff -- This sql is created to select data from the child table that will be inserted in that table itself. Load UDF would then be added to it and then a DataFrame would be created. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/476/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/478/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154997789 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -172,6 +173,16 @@ private boolean isAggLoadRequest; + private CompactionType compactionType = CompactionType.NONE; --- End diff -- This is added only for compaction, but load model is not for compaction. It is better to refactor it. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154998197 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +493,20 @@ object PreAggregateUtil { updatedPlan } + def createChildSelectQuery(tableSchema: TableSchema): String = { --- End diff -- I understand what you are doing, but I think there is better way to create the dataframe directly instead of going through parser again. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1740/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2123/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1605 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2126/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155244586 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -156,10 +158,19 @@ case class CarbonAlterTableCompactionCommand( ) .equalsIgnoreCase("true") - // if system level compaction is enabled then only one compaction can run in the system - // if any other request comes at this time then it will create a compaction request file. - // so that this will be taken up by the compaction process which is executing. - if (!isConcurrentCompactionAllowed) { + // If the carbon table is a child datamap then start compaction for the same using Load --- End diff -- Please add an interface at `DataManagementFunc` and give 2 implementations , one for maintable and another one for agg table. Execute compaction depends on table type --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155245618 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -172,6 +173,16 @@ private boolean isAggLoadRequest; + private CompactionType compactionType = CompactionType.NONE; --- End diff -- Please add this compactionType to operationContext, don't add here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r155247755 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java --- @@ -274,8 +291,11 @@ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, if (loadStartEntry) { String segmentId = String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray)); - newMetaEntry.setLoadName(segmentId); loadModel.setLoadMetadataDetails(listOfLoadFolderDetails); + if (loadModel.getCompactionType() != CompactionType.NONE) { --- End diff -- Please update the tablestatus for compaction of agg table after load success. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/526/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/530/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1605 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/531/ --- |
Free forum by Nabble | Edit this page |