[GitHub] [carbondata] QiangCai opened a new pull request #3535: [WIP] Refactory data loading for partition table

classic Classic list List threaded Threaded
56 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888379
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -181,17 +264,39 @@ class CarbonMergeFilesRDD(
     }
   }
 
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
+  override def internalCompute(theSplit: Partition,
+      context: TaskContext): Iterator[(String, SegmentFileStore.SegmentFile)] = {
     val tablePath = carbonTable.getTablePath
-    val iter = new Iterator[String] {
+    val iter = new Iterator[(String, SegmentFileStore.SegmentFile)] {
       val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
       logInfo("Merging carbon index files of segment : " +
               CarbonTablePath.getSegmentPath(tablePath, split.segmentId))
 
-      if (isHivePartitionedTable) {
+      var segmentFile: SegmentFileStore.SegmentFile = null
+      var indexSize: String = ""
+      if (isHivePartitionedTable && partitionInfo.isEmpty) {
         CarbonLoaderUtil
           .mergeIndexFilesInPartitionedSegment(carbonTable, split.segmentId,
             segmentFileNameToSegmentIdMap.get(split.segmentId), split.partitionPath)
+      } else if (isHivePartitionedTable && !partitionInfo.isEmpty) {
+        val folderDetails = CarbonLoaderUtil
+          .mergeIndexFilesInPartitionedTempSegment(carbonTable,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888397
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 ##########
 @@ -118,6 +147,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                 carbonTable = carbonMainTable,
                 mergeIndexProperty = true,
                 readFileFooterFromCarbonDataFile = true)
+              LOGGER.info("Total time taken for merge index "
+                          + (System.currentTimeMillis() - startTime))
 
 Review comment:
   accepted

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888399
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -206,16 +215,193 @@ with Serializable {
 
 case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
   extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {
+    if (isCarbonDataFlow(taskContext.getConfiguration)) {
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration)
+    }
+    super.setupTask(taskContext)
+  }
+
+  override def commitJob(jobContext: JobContext,
+      taskCommits: Seq[TaskCommitMessage]): Unit = {
+    if (isCarbonDataFlow(jobContext.getConfiguration)) {
+      var dataSize = 0L
+      val partitions =
+        taskCommits
+          .flatMap { taskCommit =>
+            taskCommit.obj match {
+              case (map: Map[String, String], _) =>
+                val partition = map.get("carbon.partitions")
+                val size = map.get("carbon.datasize")
+                if (size.isDefined) {
+                  dataSize = dataSize + java.lang.Long.parseLong(size.get)
+                }
+                if (partition.isDefined) {
+                  ObjectSerializationUtil
+                    .convertStringToObject(partition.get)
+                    .asInstanceOf[util.ArrayList[String]]
+                    .asScala
+                } else {
+                  Array.empty[String]
+                }
+              case _ => Array.empty[String]
+            }
+          }
+          .distinct
+          .toList
+          .asJava
+
+      jobContext.getConfiguration.set(
+        "carbon.output.partitions.name",
+        ObjectSerializationUtil.convertObjectToString(partitions))
+      jobContext.getConfiguration.set("carbon.datasize", dataSize.toString)
+
+      val newTaskCommits = taskCommits.map { taskCommit =>
+        taskCommit.obj match {
+          case (map: Map[String, String], set) =>
+            new TaskCommitMessage(
+              map
+                .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)),
+              set)
+          case _ => taskCommit
+        }
+      }
+      super
+        .commitJob(jobContext, newTaskCommits)
+    } else {
+      super
+        .commitJob(jobContext, taskCommits)
+    }
+  }
+
+  override def commitTask(
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888406
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -206,16 +215,193 @@ with Serializable {
 
 case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
   extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {
+    if (isCarbonDataFlow(taskContext.getConfiguration)) {
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration)
+    }
+    super.setupTask(taskContext)
+  }
+
+  override def commitJob(jobContext: JobContext,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888415
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -206,16 +215,193 @@ with Serializable {
 
 case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
   extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {
+    if (isCarbonDataFlow(taskContext.getConfiguration)) {
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration)
+    }
+    super.setupTask(taskContext)
+  }
+
+  override def commitJob(jobContext: JobContext,
+      taskCommits: Seq[TaskCommitMessage]): Unit = {
+    if (isCarbonDataFlow(jobContext.getConfiguration)) {
+      var dataSize = 0L
+      val partitions =
+        taskCommits
+          .flatMap { taskCommit =>
+            taskCommit.obj match {
+              case (map: Map[String, String], _) =>
+                val partition = map.get("carbon.partitions")
+                val size = map.get("carbon.datasize")
+                if (size.isDefined) {
+                  dataSize = dataSize + java.lang.Long.parseLong(size.get)
+                }
+                if (partition.isDefined) {
+                  ObjectSerializationUtil
+                    .convertStringToObject(partition.get)
+                    .asInstanceOf[util.ArrayList[String]]
+                    .asScala
+                } else {
+                  Array.empty[String]
+                }
+              case _ => Array.empty[String]
+            }
+          }
+          .distinct
+          .toList
+          .asJava
+
+      jobContext.getConfiguration.set(
+        "carbon.output.partitions.name",
+        ObjectSerializationUtil.convertObjectToString(partitions))
+      jobContext.getConfiguration.set("carbon.datasize", dataSize.toString)
+
+      val newTaskCommits = taskCommits.map { taskCommit =>
+        taskCommit.obj match {
+          case (map: Map[String, String], set) =>
+            new TaskCommitMessage(
+              map
+                .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)),
+              set)
+          case _ => taskCommit
+        }
+      }
+      super
+        .commitJob(jobContext, newTaskCommits)
+    } else {
+      super
+        .commitJob(jobContext, taskCommits)
+    }
+  }
+
+  override def commitTask(
+      taskContext: TaskAttemptContext
+  ): FileCommitProtocol.TaskCommitMessage = {
+    var taskMsg = super.commitTask(taskContext)
+    if (isCarbonDataFlow(taskContext.getConfiguration)) {
+      ThreadLocalSessionInfo.unsetAll()
+      val partitions: String = taskContext.getConfiguration.get("carbon.output.partitions.name", "")
+      val files = taskContext.getConfiguration.get("carbon.output.files.name", "")
+      var sum = 0L
+      var indexSize = 0L
+      if (!StringUtils.isEmpty(files)) {
+        val filesList = ObjectSerializationUtil
+          .convertStringToObject(files)
+          .asInstanceOf[util.ArrayList[String]]
+          .asScala
+        for (file <- filesList) {
+          if (file.contains(".carbondata")) {
+            sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+          } else if (file.contains(".carbonindex")) {
+            indexSize += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+          }
+        }
+      }
+      if (!StringUtils.isEmpty(partitions)) {
+        taskMsg = taskMsg.obj match {
+          case (map: Map[String, String], set) =>
+            new TaskCommitMessage(
+              map ++ Map("carbon.partitions" -> partitions, "carbon.datasize" -> sum.toString),
+              set)
+          case _ => taskMsg
+        }
+      }
+      // Update outputMetrics with carbondata and index size
+      TaskContext.get().taskMetrics().outputMetrics.setBytesWritten(sum + indexSize)
+    }
+    taskMsg
+  }
+
+  override def abortTask(taskContext: TaskAttemptContext): Unit = {
+    super.abortTask(taskContext)
+    if (isCarbonDataFlow(taskContext.getConfiguration)) {
+      val files = taskContext.getConfiguration.get("carbon.output.files.name", "")
+      if (!StringUtils.isEmpty(files)) {
+        val filesList = ObjectSerializationUtil
+          .convertStringToObject(files)
+          .asInstanceOf[util.ArrayList[String]]
+          .asScala
+        for (file <- filesList) {
+          val outputFile: String = file.substring(0, file.lastIndexOf(":"))
+          if (outputFile.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+            FileFactory
+              .deleteAllCarbonFilesOfDir(FileFactory
+                .getCarbonFile(outputFile,
+                  taskContext.getConfiguration))
+          }
+        }
+      }
+      ThreadLocalSessionInfo.unsetAll()
+    }
+  }
+
   override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
       absoluteDir: String,
       ext: String): String = {
-    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
-    if (carbonFlow != null) {
+    if (isCarbonFileFlow(taskContext.getConfiguration) ||
+        isCarbonDataFlow(taskContext.getConfiguration)) {
       super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
     } else {
       super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
     }
   }
+
+  override def newTaskTempFile(taskContext: TaskAttemptContext,
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888435
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -432,4 +541,77 @@ private class CarbonOutputWriter(path: String,
       Array.empty
     }
   }
+
+  def splitPartition(p: String): (String, String) = {
+    val value = p.substring(p.indexOf("=") + 1, p.length)
+    val col = p.substring(0, p.indexOf("="))
+    // NUll handling case. For null hive creates with this special name
+    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+      (col, null)
+      // we should replace back the special string with empty value.
+    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      (col, "")
+    } else {
+      (col, value)
+    }
+  }
+
+  def updatePartitions(
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569572781
 
 
   Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1342/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569577867
 
 
   Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1352/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579090
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1365/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
jackylk commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579581
 
 
   LGTM
   spark 2.3 integration CI has passed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579636
 
 
   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1368/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] asfgit closed pull request #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
asfgit closed pull request #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569581817
 
 
   Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1347/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569589112
 
 
   Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1357/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] brijoobopanna commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
brijoobopanna commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-570160857
 
 
   @QiangCai please check if this PR has induced any CI issues to open source

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table

GitBox
In reply to this post by GitBox
akashrn5 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-570162019
 
 
   @QiangCai i think spark2.2 CI is continously failing with this PR, can you please have a look at it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
123