dhatchayani commented on a change in pull request #3221: [CARBONDATA-3386] Concurrent Merge index and query is failing
URL: https://github.com/apache/carbondata/pull/3221#discussion_r284648773 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -23,36 +23,33 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.internal.Logging -import org.apache.spark.SparkContext import org.apache.spark.rdd.CarbonMergeFilesRDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.MergeIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.metadata.SegmentFileStore -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener} -import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent +import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent import org.apache.carbondata.processing.merger.CarbonDataMergerUtil class MergeIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { - case preStatusUpdateEvent: LoadTablePostExecutionEvent => + case preStatusUpdateEvent: LoadTablePreStatusUpdateEvent => LOGGER.info("Load post status event-listener called for merge index") val loadModel = preStatusUpdateEvent.getCarbonLoadModel val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable val compactedSegments = loadModel.getMergedSegmentIds val sparkSession = SparkSession.getActiveSession.get if(!carbonTable.isStreamingSink) { if (null != compactedSegments && !compactedSegments.isEmpty) { - mergeIndexFilesForCompactedSegments(sparkSession, + MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession, Review comment: needed in case of auto compaction. Load can trigger compaction in that case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |