[GitHub] carbondata pull request #2971: [TEST] Test loading performance of range_sort

classic Classic list List threaded Threaded
92 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2310/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244670461
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    --- End diff --
   
    Code is duplicated with method `loadDataUsingGlobalSort` , please try to extract common code and reuse here.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244680876
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    // initialize and prepare row counter
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
    +
    +    // 1. Input
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, hadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter)
    +      }
    +      .filter(_ != null)
    +
    +    // 3. Range partition by range_column
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val rangeColumnIndex =
    +      indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields)
    +    // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
    +    val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
    +    // range partition by key
    +    val numPartitions = getNumPartitions(configuration, model, convertRDD)
    +    val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn)
    +    import scala.reflect.classTag
    +    val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast)
    +    val rangeRDD = keyRDD
    +      .partitionBy(
    +        new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object]))
    +      .map(_._2)
    +
    +    // 4. Sort and Write data
    +    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
    +      DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
    +        writeStepRowCounter, conf.value.value))
    +
    +    // Log the number of rows in each step
    +    LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value)
    +
    +    // Update status
    +    if (partialSuccessAccum.value != 0) {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
    +                               "Partial_Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      executionErrors.failureCauses = FailureCauses.BAD_RECORDS
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    } else {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    }
    +  }
    +
    +  /**
    +   * provide RDD for sample
    +   * CSVRecordReader(univocity parser) will output only one column
    +   */
    +  private def getSampleRDD(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      configuration: CarbonDataLoadConfiguration,
    +      modelBroadcast: Broadcast[CarbonLoadModel]
    +  ): RDD[(Object, Object)] = {
    +    // initialize and prepare row counter
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val header = configuration.getHeader
    +    val rangeColumn = model.getRangePartitionColumn
    +    val rangeColumnIndex = (0 until header.length).find{
    +      index =>
    +        header(index).equalsIgnoreCase(rangeColumn.getColName)
    +    }.get
    +    val rangeField = configuration
    +      .getDataFields
    +      .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName))
    +      .get
    +
    +    // 1. Input
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    newHadoopConf
    +      .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, newHadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalSampleInputFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf)
    --- End diff --
   
    Why don't you use the already brodcasted conf which is broadcasted in caller method.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244681062
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---
    @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
         }
       }
     
    +  def internalInputFunc(
    +      rows: Iterator[InternalRow],
    +      index: Int,
    +      modelBroadcast: Broadcast[CarbonLoadModel],
    +      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
    +    val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
    +    val conf = DataLoadProcessBuilder.createConfiguration(model)
    +    val rowParser = new RowParserImpl(conf.getDataFields, conf)
    +    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
    +    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
    +      wrapException(e, model)
    +    }
    +
    +    new Iterator[CarbonRow] {
    +      override def hasNext: Boolean = rows.hasNext
    +
    +      override def next(): CarbonRow = {
    +        var row : CarbonRow = null
    +        val rawRow =
    +          rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
    +        if(isRawDataRequired) {
    +          row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
    +        } else {
    +          row = new CarbonRow(rowParser.parseRow(rawRow))
    +        }
    +        rowCounter.add(1)
    +        row
    +      }
    +    }
    +  }
    +
    +  def internalSampleInputFunc(
    --- End diff --
   
    Please unify `internalSampleInputFunc` and `internalInputFunc`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244681164
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    // initialize and prepare row counter
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
    +
    +    // 1. Input
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, hadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter)
    +      }
    +      .filter(_ != null)
    +
    +    // 3. Range partition by range_column
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val rangeColumnIndex =
    +      indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields)
    +    // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
    +    val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
    +    // range partition by key
    +    val numPartitions = getNumPartitions(configuration, model, convertRDD)
    +    val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn)
    +    import scala.reflect.classTag
    +    val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast)
    +    val rangeRDD = keyRDD
    +      .partitionBy(
    +        new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object]))
    +      .map(_._2)
    +
    +    // 4. Sort and Write data
    +    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
    +      DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
    +        writeStepRowCounter, conf.value.value))
    +
    +    // Log the number of rows in each step
    +    LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value)
    +
    +    // Update status
    +    if (partialSuccessAccum.value != 0) {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
    +                               "Partial_Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      executionErrors.failureCauses = FailureCauses.BAD_RECORDS
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    } else {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    }
    +  }
    +
    +  /**
    +   * provide RDD for sample
    +   * CSVRecordReader(univocity parser) will output only one column
    +   */
    +  private def getSampleRDD(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      configuration: CarbonDataLoadConfiguration,
    +      modelBroadcast: Broadcast[CarbonLoadModel]
    +  ): RDD[(Object, Object)] = {
    +    // initialize and prepare row counter
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val header = configuration.getHeader
    +    val rangeColumn = model.getRangePartitionColumn
    +    val rangeColumnIndex = (0 until header.length).find{
    +      index =>
    +        header(index).equalsIgnoreCase(rangeColumn.getColName)
    +    }.get
    +    val rangeField = configuration
    +      .getDataFields
    +      .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName))
    +      .get
    +
    +    // 1. Input
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    newHadoopConf
    +      .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, newHadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalSampleInputFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .sampleConvertFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +      .filter(_ != null)
    +
    +    convertRDD.map(row => (row.getObject(0), null))
    +  }
    +
    +  /**
    +   * calculate the number of partitions.
    +   */
    +  private def getNumPartitions(
    +      configuration: CarbonDataLoadConfiguration,
    +      model: CarbonLoadModel,
    +      convertRDD: RDD[CarbonRow]
    +  ): Int = {
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        val scaleFactor = if (model.getScaleFactor == 0) {
    +          // here it assumes the compression ratio of CarbonData is about 30%,
    +          // so it multiply by 3 to get the split size of CSV files.
    +          3
    --- End diff --
   
    Create a constant


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244681720
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---
    @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
    +    // initialize and prepare row counter
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
    +
    +    // 1. Input
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, hadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter)
    +      }
    +      .filter(_ != null)
    +
    +    // 3. Range partition by range_column
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val rangeColumnIndex =
    +      indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields)
    +    // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
    +    val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
    +    // range partition by key
    +    val numPartitions = getNumPartitions(configuration, model, convertRDD)
    +    val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn)
    +    import scala.reflect.classTag
    +    val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast)
    +    val rangeRDD = keyRDD
    +      .partitionBy(
    +        new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object]))
    +      .map(_._2)
    +
    +    // 4. Sort and Write data
    +    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
    +      DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
    +        writeStepRowCounter, conf.value.value))
    +
    +    // Log the number of rows in each step
    +    LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value)
    +
    +    // Update status
    +    if (partialSuccessAccum.value != 0) {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
    +                               "Partial_Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      executionErrors.failureCauses = FailureCauses.BAD_RECORDS
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    } else {
    +      val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    }
    +  }
    +
    +  /**
    +   * provide RDD for sample
    +   * CSVRecordReader(univocity parser) will output only one column
    +   */
    +  private def getSampleRDD(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      configuration: CarbonDataLoadConfiguration,
    +      modelBroadcast: Broadcast[CarbonLoadModel]
    +  ): RDD[(Object, Object)] = {
    +    // initialize and prepare row counter
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val header = configuration.getHeader
    +    val rangeColumn = model.getRangePartitionColumn
    +    val rangeColumnIndex = (0 until header.length).find{
    +      index =>
    +        header(index).equalsIgnoreCase(rangeColumn.getColName)
    +    }.get
    +    val rangeField = configuration
    +      .getDataFields
    +      .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName))
    +      .get
    +
    +    // 1. Input
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    newHadoopConf
    +      .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, newHadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalSampleInputFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .sampleConvertFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +      .filter(_ != null)
    +
    +    convertRDD.map(row => (row.getObject(0), null))
    +  }
    +
    +  /**
    +   * calculate the number of partitions.
    +   */
    +  private def getNumPartitions(
    +      configuration: CarbonDataLoadConfiguration,
    +      model: CarbonLoadModel,
    +      convertRDD: RDD[CarbonRow]
    +  ): Int = {
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        val scaleFactor = if (model.getScaleFactor == 0) {
    +          // here it assumes the compression ratio of CarbonData is about 30%,
    +          // so it multiply by 3 to get the split size of CSV files.
    +          3
    +        } else {
    +          model.getScaleFactor
    +        }
    +        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor
    --- End diff --
   
    Can you add a comment what is background of doing this calculation this way?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244681813
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---
    @@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
           sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
       }
     
    +  test("Make sure the result is right and sorted in global level for range_sort") {
    --- End diff --
   
    Please add some test cases with scalefactor as well.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r244681963
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala ---
    @@ -0,0 +1,319 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark
    +
    +import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +import scala.util.hashing.byteswap32
    +
    +import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
    +import org.apache.spark.serializer.JavaSerializer
    +import org.apache.spark.util.{CollectionsUtils, Utils}
    +
    +/**
    + * support data skew scenario
    + * copy from spark: RangePartiitoner
    + */
    +class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
    --- End diff --
   
    Please add the comment how it is different from spark's range partitioner and how you do skew partitioning


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    @QiangCai @jackylk Adding a `RANGE_COLUMN` at each load level does not create an issue? If user selects different range column for each load how you are going to compact when you support it in future?
    what is the background of giving the range_column in load level instead of create table level?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2327/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2121/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10377/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2132/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10386/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2338/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2133/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2339/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed  with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10387/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2141/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2971
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2347/



---
12345