Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10396/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2366/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2153/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10407/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2157/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10411/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2370/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245198317 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala --- @@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_once ORDER BY name")) } + test("Make sure the result is right and sorted in global level for range_sort") { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245198367 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.hashing.byteswap32 + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} + +/** + * support data skew scenario + * copy from spark: RangePartiitoner + */ +class DataSkewRangePartitioner[K: Ordering : ClassTag, V]( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2158/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2371/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10412/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245222242 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + var row : CarbonRow = null + val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] + if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + row = new CarbonRow(rowParser.parseRow(rawRow)) + } + rowCounter.add(1) + row + } + } + } + + def internalSampleInputFunc( --- End diff -- the parameter "rows" of them is a different type. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245223174 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + var row : CarbonRow = null + val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] + if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + row = new CarbonRow(rowParser.parseRow(rawRow)) + } + rowCounter.add(1) + row + } + } + } + + def internalSampleInputFunc( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245224217 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + // initialize and prepare row counter + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + + // 1. Input + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + + // 3. Range partition by range_column + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) + // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] + val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) + // range partition by key + val numPartitions = getNumPartitions(configuration, model, convertRDD) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + import scala.reflect.classTag + val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) + val rangeRDD = keyRDD + .partitionBy( + new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + + // 4. Sort and Write data + sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, + writeStepRowCounter, conf.value.value)) + + // Log the number of rows in each step + LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) + LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + + // Update status + if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } + } + + /** + * provide RDD for sample + * CSVRecordReader(univocity parser) will output only one column + */ + private def getSampleRDD( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration, + configuration: CarbonDataLoadConfiguration, + modelBroadcast: Broadcast[CarbonLoadModel] + ): RDD[(Object, Object)] = { + // initialize and prepare row counter + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val header = configuration.getHeader + val rangeColumn = model.getRangePartitionColumn + val rangeColumnIndex = (0 until header.length).find{ + index => + header(index).equalsIgnoreCase(rangeColumn.getColName) + }.get + val rangeField = configuration + .getDataFields + .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName)) + .get + + // 1. Input + val newHadoopConf = new Configuration(hadoopConf) + newHadoopConf + .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, newHadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalSampleInputFunc(rows, rangeField, index, modelBroadcast) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .sampleConvertFunc(rows, rangeField, index, modelBroadcast) + } + .filter(_ != null) + + convertRDD.map(row => (row.getObject(0), null)) + } + + /** + * calculate the number of partitions. + */ + private def getNumPartitions( + configuration: CarbonDataLoadConfiguration, + model: CarbonLoadModel, + convertRDD: RDD[CarbonRow] + ): Int = { + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + val scaleFactor = if (model.getScaleFactor == 0) { + // here it assumes the compression ratio of CarbonData is about 30%, + // so it multiply by 3 to get the split size of CSV files. + 3 --- End diff -- OK --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245224484 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + // initialize and prepare row counter + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + + // 1. Input + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + + // 3. Range partition by range_column + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) + // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] + val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) + // range partition by key + val numPartitions = getNumPartitions(configuration, model, convertRDD) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + import scala.reflect.classTag + val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) + val rangeRDD = keyRDD + .partitionBy( + new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + + // 4. Sort and Write data + sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, + writeStepRowCounter, conf.value.value)) + + // Log the number of rows in each step + LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) + LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + + // Update status + if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } + } + + /** + * provide RDD for sample + * CSVRecordReader(univocity parser) will output only one column + */ + private def getSampleRDD( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration, + configuration: CarbonDataLoadConfiguration, + modelBroadcast: Broadcast[CarbonLoadModel] + ): RDD[(Object, Object)] = { + // initialize and prepare row counter + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val header = configuration.getHeader + val rangeColumn = model.getRangePartitionColumn + val rangeColumnIndex = (0 until header.length).find{ + index => + header(index).equalsIgnoreCase(rangeColumn.getColName) + }.get + val rangeField = configuration + .getDataFields + .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName)) + .get + + // 1. Input + val newHadoopConf = new Configuration(hadoopConf) + newHadoopConf + .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, newHadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalSampleInputFunc(rows, rangeField, index, modelBroadcast) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .sampleConvertFunc(rows, rangeField, index, modelBroadcast) + } + .filter(_ != null) + + convertRDD.map(row => (row.getObject(0), null)) + } + + /** + * calculate the number of partitions. + */ + private def getNumPartitions( + configuration: CarbonDataLoadConfiguration, + model: CarbonLoadModel, + convertRDD: RDD[CarbonRow] + ): Int = { + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + val scaleFactor = if (model.getScaleFactor == 0) { + // here it assumes the compression ratio of CarbonData is about 30%, + // so it multiply by 3 to get the split size of CSV files. + 3 + } else { + model.getScaleFactor + } + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r245237625 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + // initialize and prepare row counter + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + + // 1. Input + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + + // 3. Range partition by range_column + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) + // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] + val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) + // range partition by key + val numPartitions = getNumPartitions(configuration, model, convertRDD) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + import scala.reflect.classTag + val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) + val rangeRDD = keyRDD + .partitionBy( + new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + + // 4. Sort and Write data + sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, + writeStepRowCounter, conf.value.value)) + + // Log the number of rows in each step + LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) + LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + + // Update status + if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } + } + + /** + * provide RDD for sample + * CSVRecordReader(univocity parser) will output only one column + */ + private def getSampleRDD( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration, + configuration: CarbonDataLoadConfiguration, + modelBroadcast: Broadcast[CarbonLoadModel] + ): RDD[(Object, Object)] = { + // initialize and prepare row counter + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val header = configuration.getHeader + val rangeColumn = model.getRangePartitionColumn + val rangeColumnIndex = (0 until header.length).find{ + index => + header(index).equalsIgnoreCase(rangeColumn.getColName) + }.get + val rangeField = configuration + .getDataFields + .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName)) + .get + + // 1. Input + val newHadoopConf = new Configuration(hadoopConf) + newHadoopConf + .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, newHadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalSampleInputFunc(rows, rangeField, index, modelBroadcast) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf) --- End diff -- SerializableConfiguration is private in spark package --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:
https://github.com/apache/carbondata/pull/2971 @ravipesala After the compaction, it will become local_sort. In my opinion, we can use Range_column to partition the input data. So it can reduce the scope of sorting during data loading to improve data loading performance. In some case, it also can improve the query performance (like Global_sort). --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2159/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2971 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2372/ --- |
Free forum by Nabble | Edit this page |