[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

classic Classic list List threaded Threaded
92 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/2971

    [TEST] Test loading performance of range_sort

    For global_sort, add a option 'range_column':
    LOAD DATA LOCAL INPATH 'xxx'  INTO TABLE xxx OPTIONS('range_column'='a column name')
   
    During data loading
    1. range partition the input data by range_column
    2. for each range, execute local sort step to load the data
   
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata range_sort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2971.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2971
   
----
commit 9d855bc65aa85efb84ad3e54f3188a16e1a58d3b
Author: QiangCai <qiangcai@...>
Date:   2018-12-03T08:37:57Z

    support range sort

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238184022
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---
    @@ -305,4 +307,107 @@ object DataLoadProcessorStepOnSpark {
               e)
         }
       }
    +
    +  def sortAdnWriteFunc(
    --- End diff --
   
    Please change the method name from sortAdnWriteFunc to sortAndWriteFunc


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1617/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1828/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9877/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1622/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9882/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1833/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238290309
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      dataFrame: Option[DataFrame],
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    val originRDD = if (dataFrame.isDefined) {
    --- End diff --
   
    This method has too much of the same code  as loadDataUsingGlobalSort, I recommend refactoring these two methods


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238292207
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      dataFrame: Option[DataFrame],
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    val originRDD = if (dataFrame.isDefined) {
    +      dataFrame.get.rdd
    +    } else {
    +      // input data from files
    +      val columnCount = model.getCsvHeaderColumns.length
    +      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
    +        .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
    +    }
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    // 1. Input
    +    val inputRDD = originRDD
    +      .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter)
    +      }
    +    // 2. Convert
    +    val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
    +      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +      DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
    +        convertStepRowCounter)
    +    }.filter(_ != null)
    +    // 3. Range partition
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn)
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        // here it assumes the compression ratio of CarbonData is about 33%,
    +        // so it multiply by 3 to get the split size of CSV files.
    +        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3
    +        numPartitions = Math.ceil(totalSize / splitSize).toInt
    --- End diff --
   
    If insert using dataframe, I think totalSize will be 0


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238297683
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---
    @@ -188,6 +188,8 @@
         optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
             Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
                 CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT));
    +
    +    optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null));
    --- End diff --
   
    Does makeCreateTableString of makeCreateTableString need add "range_column" ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238505807
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      dataFrame: Option[DataFrame],
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    val originRDD = if (dataFrame.isDefined) {
    +      dataFrame.get.rdd
    +    } else {
    +      // input data from files
    +      val columnCount = model.getCsvHeaderColumns.length
    +      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
    +        .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
    +    }
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    // 1. Input
    +    val inputRDD = originRDD
    +      .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast, inputStepRowCounter)
    +      }
    +    // 2. Convert
    +    val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
    +      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +      DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
    +        convertStepRowCounter)
    +    }.filter(_ != null)
    +    // 3. Range partition
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn)
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        // here it assumes the compression ratio of CarbonData is about 33%,
    +        // so it multiply by 3 to get the split size of CSV files.
    +        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3
    +        numPartitions = Math.ceil(totalSize / splitSize).toInt
    --- End diff --
   
    yes, insert will use global sort


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238505859
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---
    @@ -188,6 +188,8 @@
         optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
             Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
                 CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT));
    +
    +    optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null));
    --- End diff --
   
    now it only try to support load data command


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238507037
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      dataFrame: Option[DataFrame],
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    val originRDD = if (dataFrame.isDefined) {
    --- End diff --
   
    better, but after refactoring,  the code logic is not clear. Now, these two flows already reuse the process steps.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1627/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1838/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9887/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1630/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9890/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [TEST] Test loading performance of range_sort

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1841/



---
12345