[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

classic Classic list List threaded Threaded
34 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom index be...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3023
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2226/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245469143
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
    @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
     import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with Logging {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
    +        LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    --- End diff --
   
    seems the following code block is duplicated with line#44~#54, please consider to optimize that.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245469201
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
    @@ -184,6 +184,9 @@ object CarbonEnv {
           .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
    +      .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener)
    --- End diff --
   
    After adding this line, the segment will not be visible to user until the mergeBloomIndex procedure finish, right?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245469279
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
    @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
     import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with Logging {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
    +        LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    +        val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName
    +        val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
           case datamapPostEvent: BuildDataMapPostExecutionEvent =>
    -        LOGGER.info("Load post status event-listener called for merge bloom index")
    +        LOGGER.info("BuildDataMapPostExecutionEvent called for bloom index merging")
    +        // For rebuild datamap process, datamap is disabled when rebuilding
    +        if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) {
    +          // ignore datamapPostEvent from loading and compaction for bloom index merging
    +          // they use LoadTablePreStatusUpdateEvent and AlterTableCompactionPreStatusUpdateEvent
    +          LOGGER.info("Ignore BuildDataMapPostExecutionEvent from loading and compaction")
    +          return
    +        }
    +
             val carbonTableIdentifier = datamapPostEvent.identifier
             val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
    -        val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
    -        val sparkSession = SparkSession.getActiveSession.get
     
    -        // filter out bloom datamap
    -        var bloomDatamaps = tableDataMaps.asScala.filter(
    -          _.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +        // filter out current rebuilt bloom datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
                 DataMapClassProvider.BLOOMFILTER.getShortName))
    -
    -        if (datamapPostEvent.isFromRebuild) {
    -          if (null != datamapPostEvent.dmName) {
    -            // for rebuild process
    -            bloomDatamaps = bloomDatamaps.filter(
    -              _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
    -          }
    -        } else {
    -          // for load process, skip lazy datamap
    -          bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
    -        }
    +          .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
    +          .toList
     
             val segmentIds = datamapPostEvent.segmentIdList
    -        if (bloomDatamaps.size > 0 && segmentIds.size > 0) {
    -          // we extract bloom datamap name and index columns here
    -          // because TableDataMap is not serializable
    -          val bloomDMnames = ListBuffer.empty[String]
    -          val bloomIndexColumns = ListBuffer.empty[Seq[String]]
    -          bloomDatamaps.foreach( dm => {
    -            bloomDMnames += dm.getDataMapSchema.getDataMapName
    -            bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
    -          })
    -          new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
    -            segmentIds, bloomDMnames, bloomIndexColumns).collect()
    -        }
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds)
         }
       }
     
     
    -  private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = {
    -    DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName)
    +  private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable,
    +      bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = {
    +    if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) {
    +      // we extract bloom datamap name and index columns here
    +      // because TableDataMap is not serializable
    +      val bloomDMnames = ListBuffer.empty[String]
    +      val bloomIndexColumns = ListBuffer.empty[Seq[String]]
    +      bloomDatamaps.foreach(dm => {
    +        bloomDMnames += dm.getDataMapSchema.getDataMapName
    +        bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
    +      })
    +      LOGGER.info(
    +        String.format("Start to merge bloom index file for table %s. Datamaps=%s, SegmentIds=%s",
    +          carbonTable.getTableName, bloomDMnames.mkString("|"), segmentIds.mkString("|") ))
    +      new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
    +        segmentIds, bloomDMnames, bloomIndexColumns).collect()
    +      LOGGER.info("Finish merging bloom index file for table " + carbonTable.getTableName)
    +    } else {
    +      LOGGER.info("No segment of any bloom datamap needs to merge bloom index file for table "
    --- End diff --
   
    I think we can just skip this ELSE branch. For normal table without bloomdatamap, we do not need to bother the user with this information.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom index be...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/3023
 
    Besides, I think the title of the PR can be optimized to 'Include the merging bloomindex procedure in data loading transaction' -- just for your reference


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245480061
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
    @@ -184,6 +184,9 @@ object CarbonEnv {
           .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
    +      .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener)
    --- End diff --
   
    Yes. This is what this PR wants to do.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245480089
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
    @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
     import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with Logging {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
    +        LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    +        val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName
    +        val segmentId = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
           case datamapPostEvent: BuildDataMapPostExecutionEvent =>
    -        LOGGER.info("Load post status event-listener called for merge bloom index")
    +        LOGGER.info("BuildDataMapPostExecutionEvent called for bloom index merging")
    +        // For rebuild datamap process, datamap is disabled when rebuilding
    +        if (!datamapPostEvent.isFromRebuild || null == datamapPostEvent.dmName) {
    +          // ignore datamapPostEvent from loading and compaction for bloom index merging
    +          // they use LoadTablePreStatusUpdateEvent and AlterTableCompactionPreStatusUpdateEvent
    +          LOGGER.info("Ignore BuildDataMapPostExecutionEvent from loading and compaction")
    +          return
    +        }
    +
             val carbonTableIdentifier = datamapPostEvent.identifier
             val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
    -        val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
    -        val sparkSession = SparkSession.getActiveSession.get
     
    -        // filter out bloom datamap
    -        var bloomDatamaps = tableDataMaps.asScala.filter(
    -          _.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +        // filter out current rebuilt bloom datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
                 DataMapClassProvider.BLOOMFILTER.getShortName))
    -
    -        if (datamapPostEvent.isFromRebuild) {
    -          if (null != datamapPostEvent.dmName) {
    -            // for rebuild process
    -            bloomDatamaps = bloomDatamaps.filter(
    -              _.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
    -          }
    -        } else {
    -          // for load process, skip lazy datamap
    -          bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
    -        }
    +          .filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
    +          .toList
     
             val segmentIds = datamapPostEvent.segmentIdList
    -        if (bloomDatamaps.size > 0 && segmentIds.size > 0) {
    -          // we extract bloom datamap name and index columns here
    -          // because TableDataMap is not serializable
    -          val bloomDMnames = ListBuffer.empty[String]
    -          val bloomIndexColumns = ListBuffer.empty[Seq[String]]
    -          bloomDatamaps.foreach( dm => {
    -            bloomDMnames += dm.getDataMapSchema.getDataMapName
    -            bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
    -          })
    -          new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
    -            segmentIds, bloomDMnames, bloomIndexColumns).collect()
    -        }
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, segmentIds)
         }
       }
     
     
    -  private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = {
    -    DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName)
    +  private def mergeBloomIndex(sparkSession: SparkSession, carbonTable: CarbonTable,
    +      bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = {
    +    if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) {
    +      // we extract bloom datamap name and index columns here
    +      // because TableDataMap is not serializable
    +      val bloomDMnames = ListBuffer.empty[String]
    +      val bloomIndexColumns = ListBuffer.empty[Seq[String]]
    +      bloomDatamaps.foreach(dm => {
    +        bloomDMnames += dm.getDataMapSchema.getDataMapName
    +        bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
    +      })
    +      LOGGER.info(
    +        String.format("Start to merge bloom index file for table %s. Datamaps=%s, SegmentIds=%s",
    +          carbonTable.getTableName, bloomDMnames.mkString("|"), segmentIds.mkString("|") ))
    +      new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
    +        segmentIds, bloomDMnames, bloomIndexColumns).collect()
    +      LOGGER.info("Finish merging bloom index file for table " + carbonTable.getTableName)
    +    } else {
    +      LOGGER.info("No segment of any bloom datamap needs to merge bloom index file for table "
    --- End diff --
   
    OK, I'll remove this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Include bloom...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245480571
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
    @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
     import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with Logging {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
    +        LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    --- End diff --
   
    The codes are following similar steps. For L44~L46, it depends on what info we can get from the Event. For L49~L52, do you want to say we can extract a common method get datamap of a table with additional constrains?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3023: [CARBONDATA-3197][BloomDataMap] Include bloomindex m...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3023
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2174/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3023: [CARBONDATA-3197][BloomDataMap] Include bloomindex m...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3023
 
    Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10430/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3023: [CARBONDATA-3197][BloomDataMap] Include bloomindex m...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3023
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2389/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Include bloom...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245509427
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
    @@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
     import org.apache.spark.sql.SparkSession
     
     import org.apache.carbondata.common.logging.LogServiceFactory
    -import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
     import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
     import org.apache.carbondata.core.metadata.schema.table.CarbonTable
     import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
     import org.apache.carbondata.events._
    +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
    +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
     
     class MergeBloomIndexEventListener extends OperationEventListener with Logging {
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
         event match {
    +      case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
    +        LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
    +        // For loading process, segment can not be accessed at this time
    +        val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
    +        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val segmentId = loadModel.getSegmentId
    +
    +        // filter out bloom datamap, skip lazy datamap
    +        val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
    +          .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
    +            DataMapClassProvider.BLOOMFILTER.getShortName))
    +          .filter(!_.getDataMapSchema.isLazy).toList
    +
    +        mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
    +
    +      case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
    +        LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
    +        // For compact process, segment can not be accessed at this time
    +        val carbonTable = compactPreStatusUpdateEvent.carbonTable
    --- End diff --
   
    fine, just keep it as it is


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Include bloom...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r245509625
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
    @@ -184,6 +184,9 @@ object CarbonEnv {
           .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
    +      .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener)
    --- End diff --
   
    yeah, that is supposed to be.
   
    Besides, I think it's framework's responsibility to include this procedure in the data-loading transaction. So the best practice is to optimize the framework's behavior.
   
    @jackylk How do you think about this. Should we do this in this PR or later?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Include bloom...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3023#discussion_r246663278
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---
    @@ -184,6 +184,9 @@ object CarbonEnv {
           .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
           .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
    +      .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeBloomIndexEventListener)
    --- End diff --
   
    @jackylk @ravipesala @KanakaKumar Please check and review


---
12