GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/2971 [TEST] Test loading performance of range_sort For global_sort, add a option 'range_column': LOAD DATA LOCAL INPATH 'xxx' INTO TABLE xxx OPTIONS('range_column'='a column name') During data loading 1. range partition the input data by range_column 2. for each range, execute local sort step to load the data Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/carbondata range_sort Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2971.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2971 ---- commit 9d855bc65aa85efb84ad3e54f3188a16e1a58d3b Author: QiangCai <qiangcai@...> Date: 2018-12-03T08:37:57Z support range sort ---- --- |
Github user Indhumathi27 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238184022 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -305,4 +307,107 @@ object DataLoadProcessorStepOnSpark { e) } } + + def sortAdnWriteFunc( --- End diff -- Please change the method name from sortAdnWriteFunc to sortAndWriteFunc --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1617/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1828/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9877/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1622/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9882/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1833/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238290309 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { --- End diff -- This method has too much of the same code as loadDataUsingGlobalSort, I recommend refactoring these two methods --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238292207 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + // input data from files + val columnCount = model.getCsvHeaderColumns.length + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) + .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) + } + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + // 1. Input + val inputRDD = originRDD + .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + // 2. Convert + val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum, + convertStepRowCounter) + }.filter(_ != null) + // 3. Range partition + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + // here it assumes the compression ratio of CarbonData is about 33%, + // so it multiply by 3 to get the split size of CSV files. + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3 + numPartitions = Math.ceil(totalSize / splitSize).toInt --- End diff -- If insert using dataframe, I think totalSize will be 0 --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238297683 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java --- @@ -188,6 +188,8 @@ optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)); + + optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null)); --- End diff -- Does makeCreateTableString of makeCreateTableString need add "range_column" ? --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238505807 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + // input data from files + val columnCount = model.getCsvHeaderColumns.length + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) + .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) + } + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + // 1. Input + val inputRDD = originRDD + .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + // 2. Convert + val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum, + convertStepRowCounter) + }.filter(_ != null) + // 3. Range partition + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + // here it assumes the compression ratio of CarbonData is about 33%, + // so it multiply by 3 to get the split size of CSV files. + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3 + numPartitions = Math.ceil(totalSize / splitSize).toInt --- End diff -- yes, insert will use global sort --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238505859 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java --- @@ -188,6 +188,8 @@ optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)); + + optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null)); --- End diff -- now it only try to support load data command --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238507037 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = if (dataFrame.isDefined) { --- End diff -- better, but after refactoring, the code logic is not clear. Now, these two flows already reuse the process steps. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1627/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1838/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9887/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1630/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9890/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1841/ --- |
Free forum by Nabble | Edit this page |