[GitHub] [carbondata] manishgupta88 commented on a change in pull request #3281: [WIP]Index server performance improvement

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] manishgupta88 commented on a change in pull request #3281: [WIP]Index server performance improvement

GitBox
manishgupta88 commented on a change in pull request #3281: [WIP]Index server performance improvement
URL: https://github.com/apache/carbondata/pull/3281#discussion_r293713073
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
 ##########
 @@ -47,24 +51,36 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       val messageSize = SizeEstimator.estimate(dataMapFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
+    val queryId = SparkSQLUtil.getSparkSession.sparkContext.getConf
+      .get("queryId", UUID.randomUUID().toString)
+    dataMapFormat.setQueryId(queryId)
+    val tmpFolder = CarbonUtil
+      .createTempFolderForIndexServer(dataMapFormat.getCarbonTable.getTablePath, queryId)
     val (resonse, time) = logTime {
-      val spark = SparkSQLUtil.getSparkSession
-      val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
-        case null => ""
-        case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+      try {
+        val spark = SparkSQLUtil.getSparkSession
+        val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+          case null => ""
+          case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+        }
+        val taskGroupDesc = spark.sparkContext.getLocalProperty("spark.job.description") match {
+          case null => ""
+          case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+        }
+        dataMapFormat.setTaskGroupId(taskGroupId)
+        dataMapFormat.setTaskGroupDesc(taskGroupDesc)
+        var filterInf = dataMapFormat.getFilterResolverIntf
+        val filterProcessor = new FilterExpressionProcessor
+        filterInf = removeSparkUnknown(filterInf,
+          dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
+        dataMapFormat.setFilterResolverIntf(filterInf)
+        IndexServer.getClient.getSplits(dataMapFormat)
+          .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+      } finally {
+        if (null != tmpFolder && !tmpFolder.delete()) {
+          LOGGER.info("Problem while deleting the temp directory" + tmpFolder.getAbsolutePath)
 
 Review comment:
   1. change it to Logger.error
   2. Add space after the constant message

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services