[GitHub] [carbondata] ravipesala commented on a change in pull request #3281: [WIP]Index server performance improvement

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ravipesala commented on a change in pull request #3281: [WIP]Index server performance improvement

GitBox
ravipesala commented on a change in pull request #3281: [WIP]Index server performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r293267208
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
 ##########
 @@ -19,98 +19,166 @@ package org.apache.carbondata.indexserver
 
 import java.text.SimpleDateFormat
 import java.util.Date
+import java.util.concurrent.Executors
 
 import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+import scala.concurrent.duration.Duration
 
-import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DistributableDataMapFormat
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int,
+    idx: Int,
+    val inputSplit: Seq[InputSplit],
+    location: Array[String])
   extends Partition {
 
   override def index: Int = idx
 
   override def hashCode(): Int = 41 * (41 + rddId) + idx
+
+  def getLocations: Array[String] = {
+    location
+  }
 }
 
 private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkSession,
     dataMapFormat: DistributableDataMapFormat)
-  extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
+  extends CarbonRDD[(String, ExtendedBlockletWrapper)](ss, Nil) {
 
   @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
     .getName)
-
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
-
-  override protected def getPreferredLocations(split: Partition): Seq[String] = {
-    if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != null) {
-      split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
-    } else {
-      Seq()
+  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
+
+  private def clearInvalidDataMaps(segmentNo: List[String]): Unit = {
 
 Review comment:
   Please move to DataMapStoreManager and synchronize it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services