Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2226/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245469143 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging") + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging") + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable --- End diff -- seems the following code block is duplicated with line#44~#54, please consider to optimize that. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245469201 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -184,6 +184,9 @@ object CarbonEnv { .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) + .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener) --- End diff -- After adding this line, the segment will not be visible to user until the mergeBloomIndex procedure finish, right? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245469279 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging") + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging") + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + LOGGER.info("BuildDataMapPostExecutionEvent called for bloom index merging") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction for bloom index merging + // they use LoadTablePreStatusUpdateEvent and AlterTableCompactionPreStatusUpdateEvent + LOGGER.info("Ignore BuildDataMapPostExecutionEvent from loading and compaction") + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) - } + .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + .toList val segmentIds = datamapPostEvent.segmentIdList - if (bloomDatamaps.size > 0 && segmentIds.size > 0) { - // we extract bloom datamap name and index columns here - // because TableDataMap is not serializable - val bloomDMnames = ListBuffer.empty[String] - val bloomIndexColumns = ListBuffer.empty[Seq[String]] - bloomDatamaps.foreach( dm => { - bloomDMnames += dm.getDataMapSchema.getDataMapName - bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) - }) - new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, - segmentIds, bloomDMnames, bloomIndexColumns).collect() - } + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds) } } - private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { - DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable, + bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = { + if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach(dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + LOGGER.info( + String.format("Start to merge bloom index file for table %s. Datamaps=%s, SegmentIds=%s", + carbonTable.getTableName, bloomDMnames.mkString("|"), segmentIds.mkString("|") )) + new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, + segmentIds, bloomDMnames, bloomIndexColumns).collect() + LOGGER.info("Finish merging bloom index file for table " + carbonTable.getTableName) + } else { + LOGGER.info("No segment of any bloom datamap needs to merge bloom index file for table " --- End diff -- I think we can just skip this ELSE branch. For normal table without bloomdatamap, we do not need to bother the user with this information. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/3023 Besides, I think the title of the PR can be optimized to 'Include the merging bloomindex procedure in data loading transaction' -- just for your reference --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245480061 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -184,6 +184,9 @@ object CarbonEnv { .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) + .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener) --- End diff -- Yes. This is what this PR wants to do. --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245480089 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging") + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging") + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable + val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName + val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + case datamapPostEvent: BuildDataMapPostExecutionEvent => - LOGGER.info("Load post status event-listener called for merge bloom index") + LOGGER.info("BuildDataMapPostExecutionEvent called for bloom index merging") + // For rebuild datamap process, datamap is disabled when rebuilding + if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) { + // ignore datamapPostEvent from loading and compaction for bloom index merging + // they use LoadTablePreStatusUpdateEvent and AlterTableCompactionPreStatusUpdateEvent + LOGGER.info("Ignore BuildDataMapPostExecutionEvent from loading and compaction") + return + } + val carbonTableIdentifier = datamapPostEvent.identifier val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier) - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable) - val sparkSession = SparkSession.getActiveSession.get - // filter out bloom datamap - var bloomDatamaps = tableDataMaps.asScala.filter( - _.getDataMapSchema.getProviderName.equalsIgnoreCase( + // filter out current rebuilt bloom datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( DataMapClassProvider.BLOOMFILTER.getShortName)) - - if (datamapPostEvent.isFromRebuild) { - if (null != datamapPostEvent.dmName) { - // for rebuild process - bloomDatamaps = bloomDatamaps.filter( - _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) - } - } else { - // for load process, skip lazy datamap - bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy) - } + .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName)) + .toList val segmentIds = datamapPostEvent.segmentIdList - if (bloomDatamaps.size > 0 && segmentIds.size > 0) { - // we extract bloom datamap name and index columns here - // because TableDataMap is not serializable - val bloomDMnames = ListBuffer.empty[String] - val bloomIndexColumns = ListBuffer.empty[Seq[String]] - bloomDatamaps.foreach( dm => { - bloomDMnames += dm.getDataMapSchema.getDataMapName - bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) - }) - new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, - segmentIds, bloomDMnames, bloomIndexColumns).collect() - } + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds) } } - private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { - DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName) + private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable, + bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = { + if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) { + // we extract bloom datamap name and index columns here + // because TableDataMap is not serializable + val bloomDMnames = ListBuffer.empty[String] + val bloomIndexColumns = ListBuffer.empty[Seq[String]] + bloomDatamaps.foreach(dm => { + bloomDMnames += dm.getDataMapSchema.getDataMapName + bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase) + }) + LOGGER.info( + String.format("Start to merge bloom index file for table %s. Datamaps=%s, SegmentIds=%s", + carbonTable.getTableName, bloomDMnames.mkString("|"), segmentIds.mkString("|") )) + new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable, + segmentIds, bloomDMnames, bloomIndexColumns).collect() + LOGGER.info("Finish merging bloom index file for table " + carbonTable.getTableName) + } else { + LOGGER.info("No segment of any bloom datamap needs to merge bloom index file for table " --- End diff -- OK, I'll remove this --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245480571 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging") + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging") + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable --- End diff -- The codes are following similar steps. For L44~L46, it depends on what info we can get from the Event. For L49~L52, do you want to say we can extract a common method get datamap of a table with additional constrains? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2174/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10430/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3023 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2389/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245509427 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala --- @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap} import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeBloomIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get event match { + case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent => + LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging") + // For loading process, segment can not be accessed at this time + val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel + val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable + val segmentId = loadModel.getSegmentId + + // filter out bloom datamap, skip lazy datamap + val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase( + DataMapClassProvider.BLOOMFILTER.getShortName)) + .filter(!_.getDataMapSchema.isLazy).toList + + mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId)) + + case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent => + LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging") + // For compact process, segment can not be accessed at this time + val carbonTable = compactPreStatusUpdateEvent.carbonTable --- End diff -- fine, just keep it as it is --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245509625 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -184,6 +184,9 @@ object CarbonEnv { .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) + .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener) --- End diff -- yeah, that is supposed to be. Besides, I think it's framework's responsibility to include this procedure in the data-loading transaction. So the best practice is to optimize the framework's behavior. @jackylk How do you think about this. Should we do this in this PR or later? --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r246663278 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -184,6 +184,9 @@ object CarbonEnv { .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener) .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener) + .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener) --- End diff -- @jackylk @ravipesala @KanakaKumar Please check and review --- |
Free forum by Nabble | Edit this page |