QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888379 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -181,17 +264,39 @@ class CarbonMergeFilesRDD( } } - override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + override def internalCompute(theSplit: Partition, + context: TaskContext): Iterator[(String, SegmentFileStore.SegmentFile)] = { val tablePath = carbonTable.getTablePath - val iter = new Iterator[String] { + val iter = new Iterator[(String, SegmentFileStore.SegmentFile)] { val split = theSplit.asInstanceOf[CarbonMergeFilePartition] logInfo("Merging carbon index files of segment : " + CarbonTablePath.getSegmentPath(tablePath, split.segmentId)) - if (isHivePartitionedTable) { + var segmentFile: SegmentFileStore.SegmentFile = null + var indexSize: String = "" + if (isHivePartitionedTable && partitionInfo.isEmpty) { CarbonLoaderUtil .mergeIndexFilesInPartitionedSegment(carbonTable, split.segmentId, segmentFileNameToSegmentIdMap.get(split.segmentId), split.partitionPath) + } else if (isHivePartitionedTable && !partitionInfo.isEmpty) { + val folderDetails = CarbonLoaderUtil + .mergeIndexFilesInPartitionedTempSegment(carbonTable, Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888397 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -118,6 +147,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { carbonTable = carbonMainTable, mergeIndexProperty = true, readFileFooterFromCarbonDataFile = true) + LOGGER.info("Total time taken for merge index " + + (System.currentTimeMillis() - startTime)) Review comment: accepted ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888399 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -206,16 +215,193 @@ with Serializable { case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + if (isCarbonDataFlow(taskContext.getConfiguration)) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration) + } + super.setupTask(taskContext) + } + + override def commitJob(jobContext: JobContext, + taskCommits: Seq[TaskCommitMessage]): Unit = { + if (isCarbonDataFlow(jobContext.getConfiguration)) { + var dataSize = 0L + val partitions = + taskCommits + .flatMap { taskCommit => + taskCommit.obj match { + case (map: Map[String, String], _) => + val partition = map.get("carbon.partitions") + val size = map.get("carbon.datasize") + if (size.isDefined) { + dataSize = dataSize + java.lang.Long.parseLong(size.get) + } + if (partition.isDefined) { + ObjectSerializationUtil + .convertStringToObject(partition.get) + .asInstanceOf[util.ArrayList[String]] + .asScala + } else { + Array.empty[String] + } + case _ => Array.empty[String] + } + } + .distinct + .toList + .asJava + + jobContext.getConfiguration.set( + "carbon.output.partitions.name", + ObjectSerializationUtil.convertObjectToString(partitions)) + jobContext.getConfiguration.set("carbon.datasize", dataSize.toString) + + val newTaskCommits = taskCommits.map { taskCommit => + taskCommit.obj match { + case (map: Map[String, String], set) => + new TaskCommitMessage( + map + .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)), + set) + case _ => taskCommit + } + } + super + .commitJob(jobContext, newTaskCommits) + } else { + super + .commitJob(jobContext, taskCommits) + } + } + + override def commitTask( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888406 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -206,16 +215,193 @@ with Serializable { case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + if (isCarbonDataFlow(taskContext.getConfiguration)) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration) + } + super.setupTask(taskContext) + } + + override def commitJob(jobContext: JobContext, Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888415 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -206,16 +215,193 @@ with Serializable { case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + if (isCarbonDataFlow(taskContext.getConfiguration)) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskContext.getConfiguration) + } + super.setupTask(taskContext) + } + + override def commitJob(jobContext: JobContext, + taskCommits: Seq[TaskCommitMessage]): Unit = { + if (isCarbonDataFlow(jobContext.getConfiguration)) { + var dataSize = 0L + val partitions = + taskCommits + .flatMap { taskCommit => + taskCommit.obj match { + case (map: Map[String, String], _) => + val partition = map.get("carbon.partitions") + val size = map.get("carbon.datasize") + if (size.isDefined) { + dataSize = dataSize + java.lang.Long.parseLong(size.get) + } + if (partition.isDefined) { + ObjectSerializationUtil + .convertStringToObject(partition.get) + .asInstanceOf[util.ArrayList[String]] + .asScala + } else { + Array.empty[String] + } + case _ => Array.empty[String] + } + } + .distinct + .toList + .asJava + + jobContext.getConfiguration.set( + "carbon.output.partitions.name", + ObjectSerializationUtil.convertObjectToString(partitions)) + jobContext.getConfiguration.set("carbon.datasize", dataSize.toString) + + val newTaskCommits = taskCommits.map { taskCommit => + taskCommit.obj match { + case (map: Map[String, String], set) => + new TaskCommitMessage( + map + .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)), + set) + case _ => taskCommit + } + } + super + .commitJob(jobContext, newTaskCommits) + } else { + super + .commitJob(jobContext, taskCommits) + } + } + + override def commitTask( + taskContext: TaskAttemptContext + ): FileCommitProtocol.TaskCommitMessage = { + var taskMsg = super.commitTask(taskContext) + if (isCarbonDataFlow(taskContext.getConfiguration)) { + ThreadLocalSessionInfo.unsetAll() + val partitions: String = taskContext.getConfiguration.get("carbon.output.partitions.name", "") + val files = taskContext.getConfiguration.get("carbon.output.files.name", "") + var sum = 0L + var indexSize = 0L + if (!StringUtils.isEmpty(files)) { + val filesList = ObjectSerializationUtil + .convertStringToObject(files) + .asInstanceOf[util.ArrayList[String]] + .asScala + for (file <- filesList) { + if (file.contains(".carbondata")) { + sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1)) + } else if (file.contains(".carbonindex")) { + indexSize += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1)) + } + } + } + if (!StringUtils.isEmpty(partitions)) { + taskMsg = taskMsg.obj match { + case (map: Map[String, String], set) => + new TaskCommitMessage( + map ++ Map("carbon.partitions" -> partitions, "carbon.datasize" -> sum.toString), + set) + case _ => taskMsg + } + } + // Update outputMetrics with carbondata and index size + TaskContext.get().taskMetrics().outputMetrics.setBytesWritten(sum + indexSize) + } + taskMsg + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + super.abortTask(taskContext) + if (isCarbonDataFlow(taskContext.getConfiguration)) { + val files = taskContext.getConfiguration.get("carbon.output.files.name", "") + if (!StringUtils.isEmpty(files)) { + val filesList = ObjectSerializationUtil + .convertStringToObject(files) + .asInstanceOf[util.ArrayList[String]] + .asScala + for (file <- filesList) { + val outputFile: String = file.substring(0, file.lastIndexOf(":")) + if (outputFile.endsWith(CarbonTablePath.CARBON_DATA_EXT)) { + FileFactory + .deleteAllCarbonFilesOfDir(FileFactory + .getCarbonFile(outputFile, + taskContext.getConfiguration)) + } + } + } + ThreadLocalSessionInfo.unsetAll() + } + } + override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol") - if (carbonFlow != null) { + if (isCarbonFileFlow(taskContext.getConfiguration) || + isCarbonDataFlow(taskContext.getConfiguration)) { super.newTaskTempFile(taskContext, Some(absoluteDir), ext) } else { super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) } } + + override def newTaskTempFile(taskContext: TaskAttemptContext, Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#discussion_r361888435 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -432,4 +541,77 @@ private class CarbonOutputWriter(path: String, Array.empty } } + + def splitPartition(p: String): (String, String) = { + val value = p.substring(p.indexOf("=") + 1, p.length) + val col = p.substring(0, p.indexOf("=")) + // NUll handling case. For null hive creates with this special name + if (value.equals("__HIVE_DEFAULT_PARTITION__")) { + (col, null) + // we should replace back the special string with empty value. + } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { + (col, "") + } else { + (col, value) + } + } + + def updatePartitions( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569572781 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1342/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [WIP] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569577867 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1352/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579090 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1365/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579581 LGTM spark 2.3 integration CI has passed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569579636 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1368/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
asfgit closed pull request #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569581817 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1347/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-569589112 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1357/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
brijoobopanna commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-570160857 @QiangCai please check if this PR has induced any CI issues to open source ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
akashrn5 commented on issue #3535: [CARBONDATA-3641] Refactory data loading for partition table
URL: https://github.com/apache/carbondata/pull/3535#issuecomment-570162019 @QiangCai i think spark2.2 CI is continously failing with this PR, can you please have a look at it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |