GitHub user kevinjmh opened a pull request:
https://github.com/apache/carbondata/pull/3023 [CARBONDATA-3197][BloomDataMap] Merge bloom index before accessible **Problem** Currently carbon allows to query when bloom index files are merging, but this will cause problems when the index files state change from multiple shards to merged shard. Timeline to explain problem: - load data for table with bloom datamap, data is loaded, bloom index files are generated along loading, bloom index file merging is under action - query fired - `BloomCoarseGrainDataMapFactory.getAllShardPaths` found multiple shards, and bloom index file merging in progress, so `BloomCoarseGrainDataMap` with detailed shard name created - bloom index file merging done, folders with detailed shard name are deleted - Exception will occur when `BloomCoarseGrainDataMap` wants to read bloom index file from folders with detailed shard name to prune **Analyse** Root cause is that we allow query on datamap which is not in stable state. one solution is to disable datamap when merging bloom index file, but this will affect all the segments many times. Another solution is to take the bloom index files merging as part of loading, such that query can not access unstable bloom index files until it is ready **Solution** Change the events to watch for `MergeBloomIndexEventListener`, do the merging staff before segment status is updated for access Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kevinjmh/carbondata mergeBloomIndexEvent Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3023.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3023 ---- commit 881a510de38e30cfa4ae8a84be1f003c6254d9ab Author: Manhua <kevinjmh@...> Date: 2018-12-25T08:21:40Z merge bloom index before accessible ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1936/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10189/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2145/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243901940 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction --- End diff -- it's better to add a log here (if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) ) --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243904016 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) --- End diff -- The reason for removing this branch is that bloom datamap is never lazy? But line 51 and line 65 also handle the lazy scenes ? --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243902213 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) - } + .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + .toList val segmentIds = datamapPostEvent.segmentIdList - if (bloomDatamaps.size > 0 && segmentIds.size > 0) { - // we extract bloom datamap name and index columns here - // because TableDataMap is not serializable - val bloomDMnames = ListBuffer.empty[String] - val bloomIndexColumns = ListBuffer.empty[Seq[String]] - bloomDatamaps.foreach( dm => { - bloomDMnames += dm.getDataMapSchema.getDataMapName - bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) - }) - new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, - segmentIds, bloomDMnames, bloomIndexColumns).collect() - } + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds) } } - private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { - DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable, + bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = { + if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach(dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + LOGGER.info("Start to merge bloom index file") --- End diff -- it's better to add the key message on the three logs, such as tablename , segmentids,bloomDatamaps --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243905231 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) --- End diff -- original implementation deal all three scenes(load/compact/rebuild), so use if-else to distinguish. Now, three scenes correspond to different event. Only rebuild deals with lazy bloom datamap --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243905862 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction --- End diff -- what information do you expect? ignore datamapPostEvent event from loading and compaction because the bloom index files is already merged by above event. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1937/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10190/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2146/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243927801 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction --- End diff -- ```suggestion LOGGER.info("Ignore datamapPostEvent from loading and compaction"); // ignore datamapPostEvent from loading and compaction ``` --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243928449 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction --- End diff -- Added --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243928456 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) - } + .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + .toList val segmentIds = datamapPostEvent.segmentIdList - if (bloomDatamaps.size > 0 && segmentIds.size > 0) { - // we extract bloom datamap name and index columns here - // because TableDataMap is not serializable - val bloomDMnames = ListBuffer.empty[String] - val bloomIndexColumns = ListBuffer.empty[Seq[String]] - bloomDatamaps.foreach( dm => { - bloomDMnames += dm.getDataMapSchema.getDataMapName - bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) - }) - new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, - segmentIds, bloomDMnames, bloomIndexColumns).collect() - } + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds) } } - private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { - DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable, + bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = { + if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach(dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + LOGGER.info("Start to merge bloom index file") --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1940/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10193/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2149/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1951/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10204/ --- |
Free forum by Nabble | Edit this page |