Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1972 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3727/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1972 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2487/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1972 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3525/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1972 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3526/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1972 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3528/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168702275 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java --- @@ -279,18 +279,20 @@ public boolean isPartionedSegment() { * dropped * @throws IOException */ - public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId, - boolean partialMatch) throws IOException { + public void dropPartitions(String segmentPath, List<List<String>> partitionsToDrop, --- End diff -- All the callers right now has passed only single list for "partitionsToDrop". So I think we can make single list. Is there a special purpose? --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168696420 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala --- @@ -59,10 +61,12 @@ class CarbonDropPartitionRDD( val iter = new Iterator[String] { val split = theSplit.asInstanceOf[CarbonDropPartition] logInfo("Dropping partition information from : " + split.segmentPath) - + partitions.toList.asJava --- End diff -- Please remove this unused code --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168677700 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -122,7 +119,11 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws throw new IOException(e); } } - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + String uniqueId = null; + if (overwriteSet) { + uniqueId = overwritePartitions(loadModel); + } + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false); --- End diff -- As per the event listener model we should update the table status file before firing the postStatusUpdateEvent. Any specific reason for updating the table status file after post event? --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168677068 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -104,6 +104,38 @@ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType da } } + /** + * This method will convert a given value to its specific type + * + * @param msrValue + * @param dataType + * @param carbonMeasure + * @return + */ + public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType, --- End diff -- We have similar method getMeasureValueBasedOnDataType in the DataTypeUtil class. Only difference in implementation is for decimal datatype. I think we can refactor and put the common code in one private method --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168699514 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -676,6 +734,48 @@ case class CarbonLoadDataCommand( } } + private def convertData( + originRDD: RDD[Row], + sparkSession: SparkSession, + model: CarbonLoadModel, + isDataFrame: Boolean): RDD[InternalRow] = { + model.setPartitionId("0") + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + // 1. Input + var convertRDD = --- End diff -- this var can be val --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168700095 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -676,6 +734,48 @@ case class CarbonLoadDataCommand( } } + private def convertData( + originRDD: RDD[Row], + sparkSession: SparkSession, + model: CarbonLoadModel, + isDataFrame: Boolean): RDD[InternalRow] = { + model.setPartitionId("0") + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + // 1. Input + var convertRDD = + if (isDataFrame) { + originRDD.mapPartitions{rows => + DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast) + } + } else { + originRDD.map{row => + val array = new Array[AnyRef](row.length) + var i = 0 + while (i < array.length) { + array(i) = row.get(i).asInstanceOf[AnyRef] + i = i + 1 + } + array + } + } + val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) => + DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) --- End diff -- setDataTypeConverter is a static method. I think in concurrent scenarios if this is getting called from multiple places..this value can be get overriden --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168416054 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java --- @@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) { @Override public void freeMemory() { - + byteArrayData = null; --- End diff -- other types like intData, longData can also be used, so please set the references to null for all the types here --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168700461 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -544,17 +547,76 @@ case class CarbonLoadDataCommand( CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, isEmptyBadRecord) CarbonSession.threadSet("partition.operationcontext", operationContext) + // input data from csv files. Convert to logical plan --- End diff -- In all the above lines wherever threadset is used, everytime we are cloning a new object for CarbonSessionInfo. As it is an internal call we can write a overloaded method and pass boolean to avoid creating a new object everytime --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168705495 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -592,41 +666,12 @@ case class CarbonLoadDataCommand( case _ => false } } - val len = rowDataTypes.length - var rdd = - DataLoadingUtil.csvFileScanRDD( - sparkSession, - model = carbonLoadModel, - hadoopConf) - .map { row => - val data = new Array[Any](len) - var i = 0 - val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]] - val inputLen = Math.min(input.length, len) - while (i < inputLen) { - data(i) = UTF8String.fromString(input(i)) - // If partition column then update empty value with special string otherwise spark - // makes it as null so we cannot internally handle badrecords. - if (partitionColumns(i)) { - if (input(i) != null && input(i).isEmpty) { - data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL) - } - } - i = i + 1 - } - InternalRow.fromSeq(data) - - } - // Only select the required columns - val output = if (partition.nonEmpty) { - val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)} - catalogTable.schema.map { attr => - attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) - } else { - catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) - } - Project(output, LogicalRDD(attributes, rdd)(sparkSession)) + val columnCount = carbonLoadModel.getCsvHeaderColumns.length + var rdd = DataLoadingUtil.csvFileScanRDD( --- End diff -- this var can be val --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168714751 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.steps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.row.CarbonRowBatch; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It reads data from record reader and sends data to next step. + */ +public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep { + + private CarbonIterator<Object[]>[] inputIterators; + + private boolean[] noDictionaryMapping; + + private DataType[] dataTypes; + + private int[] orderOfData; + + public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration, + CarbonIterator<Object[]>[] inputIterators) { + super(configuration, null); + this.inputIterators = inputIterators; + } + + @Override public DataField[] getOutput() { + return configuration.getDataFields(); + } + + @Override public void initialize() throws IOException { + super.initialize(); + // if logger is enabled then raw data will be required. + RowConverterImpl rowConverter = + new RowConverterImpl(configuration.getDataFields(), configuration, null); + rowConverter.initialize(); + configuration.setCardinalityFinder(rowConverter); + noDictionaryMapping = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + dataTypes = new DataType[configuration.getDataFields().length]; + for (int i = 0; i < dataTypes.length; i++) { + if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) { + dataTypes[i] = DataTypes.INT; + } else { + dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType(); + } + } + orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader()); + } + + private int[] arrangeData(DataField[] dataFields, String[] header) { + int[] data = new int[dataFields.length]; + for (int i = 0; i < dataFields.length; i++) { + for (int j = 0; j < header.length; j++) { + if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) { + data[i] = j; + break; + } + } + } + return data; + } + + @Override public Iterator<CarbonRowBatch>[] execute() { + int batchSize = CarbonProperties.getInstance().getBatchSize(); + List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; + for (int i = 0; i < outIterators.length; i++) { + outIterators[i] = + new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(), + rowCounter, orderOfData, noDictionaryMapping, dataTypes); + } + return outIterators; + } + + /** + * Partition input iterators equally as per the number of threads. + * + * @return + */ + private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() { + // Get the number of cores configured in property. + int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + // Get the minimum of number of cores and iterators size to get the number of parallel threads + // to be launched. + int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); + + List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber]; + for (int i = 0; i < parallelThreadNumber; i++) { + iterators[i] = new ArrayList<>(); + } + // Equally partition the iterators as per number of threads + for (int i = 0; i < inputIterators.length; i++) { + iterators[i % parallelThreadNumber].add(inputIterators[i]); + } + return iterators; + } + + @Override protected CarbonRow processRow(CarbonRow row) { + return null; + } + + @Override public void close() { + if (!closed) { + super.close(); + for (CarbonIterator inputIterator : inputIterators) { + inputIterator.close(); --- End diff -- Better to put in try and finally block...even if one iterator fails to close still other iterators can be closed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r169982277 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -104,6 +104,38 @@ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType da } } + /** + * This method will convert a given value to its specific type + * + * @param msrValue + * @param dataType + * @param carbonMeasure + * @return + */ + public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r169983220 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -122,7 +119,11 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws throw new IOException(e); } } - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + String uniqueId = null; + if (overwriteSet) { + uniqueId = overwritePartitions(loadModel); + } + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false); --- End diff -- ok, changed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r169983469 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -676,6 +734,48 @@ case class CarbonLoadDataCommand( } } + private def convertData( + originRDD: RDD[Row], + sparkSession: SparkSession, + model: CarbonLoadModel, + isDataFrame: Boolean): RDD[InternalRow] = { + model.setPartitionId("0") + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + // 1. Input + var convertRDD = --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on the issue:
https://github.com/apache/carbondata/pull/1972 LGTM, few comments are handled as part of #1984 --- |
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |