ravipesala commented on a change in pull request #3281: [WIP]Index server performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r293265467 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala ########## @@ -19,98 +19,166 @@ package org.apache.carbondata.indexserver import java.text.SimpleDateFormat import java.util.Date +import java.util.concurrent.Executors import scala.collection.JavaConverters._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.duration.Duration -import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.commons.lang.StringUtils +import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID} import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException} +import org.apache.spark.{Partition, SparkEnv, TaskContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.DistributionUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.CacheProvider -import org.apache.carbondata.core.datamap.DistributableDataMapFormat +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap} +import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.indexstore.ExtendedBlocklet -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory} import org.apache.carbondata.spark.rdd.CarbonRDD import org.apache.carbondata.spark.util.CarbonScalaUtil -class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) +class DataMapRDDPartition(rddId: Int, + idx: Int, + val inputSplit: Seq[InputSplit], + location: Array[String]) extends Partition { override def index: Int = idx override def hashCode(): Int = 41 * (41 + rddId) + idx + + def getLocations: Array[String] = { + location + } } private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkSession, dataMapFormat: DistributableDataMapFormat) - extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) { + extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) { @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD] .getName) - private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") formatter.format(new Date()) } - - override protected def getPreferredLocations(split: Partition): Seq[String] = { - if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != null) { - split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq - } else { - Seq() + var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _ + + private def clearInvalidDataMaps(segmentNo: List[String]): Unit = { + if (dataMapFormat.isJobToClearDataMaps) { + if (StringUtils.isNotEmpty(dataMapFormat.getDataMapToClear)) { + val dataMaps = DataMapStoreManager.getInstance + .getAllDataMap(dataMapFormat.getCarbonTable).asScala.collect { + case dataMap if dataMapFormat.getDataMapToClear + .equalsIgnoreCase(dataMap.getDataMapSchema.getDataMapName) => + segmentNo.foreach(segment => dataMap.deleteSegmentDatamapData(segment)) + dataMap.clear() + Nil + case others => List(others) + }.flatten + DataMapStoreManager.getInstance.getAllDataMaps + .put(dataMapFormat.getCarbonTable.getTableUniqueName, dataMaps.asJava) + } + else { Review comment: move up ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |