ajantha-bhat commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585136781 @QiangCai : OK rebased ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585146200 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/260/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585167813 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1963/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585231947 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585241664 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/266/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585270911 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1969/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378286697 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory + +/* +* insert into with df, doesn't use logical plan +* +*/ +case class CarbonInsertIntoWithDf(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), Review comment: not required ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378286214 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,517 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), Review comment: not required ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378690024 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ########## @@ -140,6 +141,89 @@ object CarbonScalaUtil { } } + /** + * Converts incoming value to String after converting data as per the data type. + * + * @param value Input value to convert + * @param dataType Datatype to convert and then convert to String + * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes + * @param dateFormat DataFormat to convert in case of DateType datatype + * @return converted String + */ + def convertStaticPartitionToValues( + value: String, + dataType: DataType, + timeStampFormat: SimpleDateFormat, + dateFormat: SimpleDateFormat): AnyRef = { + val defaultValue = value != null && value.equalsIgnoreCase(hiveDefaultPartition) + try { + dataType match { + case TimestampType if timeStampFormat != null => + val formattedString = + if (defaultValue) { + timeStampFormat.format(new Date()) + } else { + timeStampFormat.format(DateTimeUtils.stringToTime(value)) + } + val convertedValue = + DataTypeUtil + .getDataBasedOnDataType(formattedString, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType)) + convertedValue + case DateType if dateFormat != null => + val formattedString = + if (defaultValue) { + dateFormat.format(new Date()) + } else { + dateFormat.format(DateTimeUtils.stringToTime(value)) + } + val convertedValue = + DataTypeUtil + .getDataBasedOnDataType(formattedString, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType)) + val date = generateDictionaryKey(convertedValue.asInstanceOf[Long]) + date.asInstanceOf[AnyRef] + case BinaryType => + // TODO: decode required ? currently it is working + ByteUtil.toBytes(value) + case _ => + val convertedValue = + DataTypeUtil + .getDataBasedOnDataType(value, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)) + if (convertedValue == null) { + if (defaultValue) { + dataType match { + case BooleanType => + return false.asInstanceOf[AnyRef] + case _ => + return 0.asInstanceOf[AnyRef] + } + } + throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") + } + convertedValue + } + } catch { + case e: Exception => + throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") + } + } + + def generateDictionaryKey(timeValue: Long): Int = { + if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE || + timeValue > DateDirectDictionaryGenerator.MAX_VALUE) { + if (LOGGER.isDebugEnabled) { + LOGGER.debug("Value for date type column is not in valid range. Value considered as null.") + } + return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL + } + Math.floor(timeValue.toDouble / DateDirectDictionaryGenerator.MILLIS_PER_DAY).toInt + + DateDirectDictionaryGenerator.cutOffDate Review comment: please fix the code style ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378692400 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -304,11 +305,11 @@ object CarbonDataRDDFactory { def loadCarbonData( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - columnar: Boolean, partitionStatus: SegmentStatus = SegmentStatus.SUCCESS, overwriteTable: Boolean, hadoopConf: Configuration, dataFrame: Option[DataFrame] = None, + scanResultRdd : Option[RDD[InternalRow]], Review comment: provide a default value ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378299353 ########## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -150,10 +150,14 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { sql("drop table if exists load32000chardata") sql("drop table if exists load32000chardata_dup") sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata") - sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata") + sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS " + + "carbondata tblproperties('local_dictionary_enable'='false')") sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')") + // Previously converter step was checking more than 32k length and throwing exception. + // Now, due to local dictionary is true. Insert will not fail. + // when local dictionary is false, insert will fail at write step intercept[Exception] { - sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() + sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,load32000chardata.dim2),mes1 from load32000chardata").show() Review comment: why change it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378715601 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ########## @@ -263,17 +265,35 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.setPreFetch(false) val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() - val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context) val serializer = SparkEnv.get.closureSerializer.newInstance() var serializeBytes: Array[Byte] = null - while(partitionIterator.hasNext) { - val value = partitionIterator.next() - if (serializeBytes == null) { - serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array() + if (isDataFrame) { + val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, + context) + while (partitionIterator.hasNext) { + val value = partitionIterator.next() + if (serializeBytes == null) { + serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array() + } + recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, + carbonLoadModel, context) } - recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, + } else { + // For internal row, no need of converter and re-arrange step, + model.setLoadWithoutConverterWithoutReArrangeStep(true) Review comment: how about set it in driver side and remove 'isDataFrame' of NewDataFrameLoaderRDD ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378690929 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ########## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { + val data = new Array[AnyRef](outputArrayLength) + var i = 0 + val fieldTypesLen = fieldTypes.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => Review comment: why StringType not go to this 'other' branch? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378719858 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ########## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { + val data = new Array[AnyRef](outputArrayLength) + var i = 0 + val fieldTypesLen = fieldTypes.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => + data(i) = row.get(i, other) + } + } + i += 1 + } + data + } + + /** + * After converting complex objects to carbon objects, need to convert to byte array + * + * @param row + * @param fields + * @param dataFieldsWithComplexDataType + * @return + */ + def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort( + row: InternalRow, + fields: Seq[StructField], + dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = { + val data = new Array[AnyRef](fields.size) + val badRecordLogHolder = new BadRecordLogHolder(); + var i = 0 + val fieldTypesLen = fields.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fields(i).dataType match { + case StringType => + data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i), + DataTypes.STRING) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] Review comment: please extract the method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378720643 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ########## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { + val data = new Array[AnyRef](outputArrayLength) + var i = 0 + val fieldTypesLen = fieldTypes.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => + data(i) = row.get(i, other) + } + } + i += 1 + } + data + } + + /** + * After converting complex objects to carbon objects, need to convert to byte array + * + * @param row + * @param fields + * @param dataFieldsWithComplexDataType + * @return + */ + def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort( + row: InternalRow, + fields: Seq[StructField], + dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = { + val data = new Array[AnyRef](fields.size) + val badRecordLogHolder = new BadRecordLogHolder(); + var i = 0 + val fieldTypesLen = fields.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fields(i).dataType match { + case StringType => + data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i), + DataTypes.STRING) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case structType : StructType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType] + .writeByteArray(result.asInstanceOf[StructObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case mapType : MapType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, mapType), mapType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] Review comment: reuse the method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378725483 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,517 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var inputSqlString: String = null, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { - var loadCommand: CarbonLoadDataCommand = _ + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var scanResultRdd: RDD[InternalRow] = _ + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + var finalPartition: Map[String, Option[String]] = Map.empty + + var isInsertIntoWithConverterFlow: Boolean = false + + var dataFrame: DataFrame = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def containsLimit(plan: LogicalPlan): Boolean = { - plan find { - case limit: GlobalLimit => true - case other => false - } isDefined + if (!tableInfoOp.isDefined) { Review comment: if tableInfoOp is must be defined, no need to use an option variable ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378720533 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ########## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { + val data = new Array[AnyRef](outputArrayLength) + var i = 0 + val fieldTypesLen = fieldTypes.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => + data(i) = row.get(i, other) + } + } + i += 1 + } + data + } + + /** + * After converting complex objects to carbon objects, need to convert to byte array + * + * @param row + * @param fields + * @param dataFieldsWithComplexDataType + * @return + */ + def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort( + row: InternalRow, + fields: Seq[StructField], + dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = { + val data = new Array[AnyRef](fields.size) + val badRecordLogHolder = new BadRecordLogHolder(); + var i = 0 + val fieldTypesLen = fields.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fields(i).dataType match { + case StringType => + data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i), + DataTypes.STRING) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType] + .writeByteArray(result.asInstanceOf[ArrayObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] + case structType : StructType => + val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType) + // convert carbon complex object to byte array + val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray) + dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType] + .writeByteArray(result.asInstanceOf[StructObject], + dataOutputStream, + badRecordLogHolder) + dataOutputStream.close() + data(i) = byteArray.toByteArray.asInstanceOf[AnyRef] Review comment: reuse the method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378724111 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -953,28 +983,46 @@ object CarbonDataRDDFactory { /** * Execute load process to load from input dataframe + * + * @param sqlContext sql context + * @param dataFrame optional dataframe for insert + * @param scanResultRDD optional internal row rdd for direct insert + * @param carbonLoadModel load model + * @return Return an array that contains all of the elements in NewDataFrameLoaderRDD. */ private def loadDataFrame( sqlContext: SQLContext, dataFrame: Option[DataFrame], + scanResultRDD: Option[RDD[InternalRow]], carbonLoadModel: CarbonLoadModel ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { try { - val rdd = dataFrame.get.rdd - + val rdd = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + scanResultRDD.get + } val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) }.distinct.length val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList( nodeNumOfData, sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray - .distinct) - + val newRdd = + if (dataFrame.isDefined) { + new DataLoadCoalescedRDD[Row]( + sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray.distinct) + } else { + new DataLoadCoalescedRDD[InternalRow]( + sqlContext.sparkSession, + scanResultRDD.get, + nodes.toArray.distinct) + } new NewDataFrameLoaderRDD( sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, + dataFrame.isDefined, Review comment: no need this parameter if loadDataFrame method better to set carbonLoadModel.setLoadWithoutConverterWithoutReArrangeStep ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378728896 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,517 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var inputSqlString: String = null, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { - var loadCommand: CarbonLoadDataCommand = _ + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var scanResultRdd: RDD[InternalRow] = _ + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + var finalPartition: Map[String, Option[String]] = Map.empty + + var isInsertIntoWithConverterFlow: Boolean = false + + var dataFrame: DataFrame = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def containsLimit(plan: LogicalPlan): Boolean = { - plan find { - case limit: GlobalLimit => true - case other => false - } isDefined + if (!tableInfoOp.isDefined) { + throw new RuntimeException(" table info must be present when logical relation exist") } - + // If logical plan is unresolved, need to convert it to resolved. + dataFrame = Dataset.ofRows(sparkSession, logicalPlan) + logicalPlan = dataFrame.queryExecution.analyzed + // Currently projection re-ordering is based on schema ordinal, + // for some scenarios in alter table scenario, schema ordinal logic cannot be applied. + // So, sending it to old flow + // TODO: Handle alter table in future, this also must use new flow. + if (CarbonProperties.isBadRecordHandlingEnabledForInsert || + isAlteredSchema(tableInfoOp.get.getFactTable)) { + isInsertIntoWithConverterFlow = true + } + if (isInsertIntoWithConverterFlow) { + return Seq.empty + } + setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName) ThreadLocalSessionInfo .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, - CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) - val isPersistRequired = - isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child) - val df = - if (isPersistRequired) { - LOGGER.info("Persist enabled for Insert operation") - Dataset.ofRows(sparkSession, child).persist( - StorageLevel.fromString( - CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel)) + val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils + .processMetadataCommon( + sparkSession, + databaseNameOp, + tableName, + tableInfoOp, + partition) + this.sizeInBytes = sizeInBytes + this.table = table + this.logicalPartitionRelation = logicalPartitionRelation + this.finalPartition = finalPartition + setAuditTable(dbName, tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (isInsertIntoWithConverterFlow) { + return CarbonInsertIntoWithDf( + databaseNameOp = databaseNameOp, + tableName = tableName, + options = options, + isOverwriteTable = isOverwriteTable, + dimFilesPath = dimFilesPath, + dataFrame = dataFrame, + inputSqlString = inputSqlString, + updateModel = None, + tableInfoOp = tableInfoOp, + internalOptions = internalOptions, + partition = partition).process(sparkSession) + } + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") + val factPath = "" + currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table) + CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession) + val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options) + val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel( + hadoopConf = hadoopConf, + factPath = factPath, + optionsFinal = optionsFinal, + parentTablePath = parentTablePath, + table = table, + isDataFrame = true, + internalOptions = internalOptions, + partition = partition, + options = options) + val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel) + timeStampFormat = tf + dateFormat = df + val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo + val partitionColumnSchema = + if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) { + partitionInfo.getColumnSchemaList.asScala + } else { + null + } + val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema) + val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema( + tableInfoOp.get, + partitionColumnSchema) + val newLogicalPlan = getReArrangedLogicalPlan( + reArrangedIndex, + selectedColumnSchema, + convertedStaticPartition) + scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd + if (logicalPartitionRelation != null) { + logicalPartitionRelation = + getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation) + } + // Delete stale segment folders that are not in table status but are physically present in + // the Fact folder + LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") + TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + var isUpdateTableStatusRequired = false + val uuid = "" + try { + val (tableDataMaps, dataMapOperationContext) = + CommonLoadUtils.firePreLoadEvents(sparkSession, + carbonLoadModel, + uuid, + table, + isOverwriteTable, + operationContext) + // Clean up the old invalid segment data before creating a new entry for new load. + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) + // add the start entry for the new load in the table status file + if (!table.isHivePartitionTable) { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( + carbonLoadModel, + isOverwriteTable) + isUpdateTableStatusRequired = true + } + if (isOverwriteTable) { + LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") + } + // Create table and metadata folders if not exist + if (carbonLoadModel.isCarbonTransactionalTable) { + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) + if (!FileFactory.isFileExist(metadataDirectoryPath)) { + FileFactory.mkdirs(metadataDirectoryPath) + } } else { - Dataset.ofRows(sparkSession, child) + carbonLoadModel.setSegmentId(System.nanoTime().toString) } - val header = relation.tableSchema.get.fields.map(_.name).mkString(",") - loadCommand = CarbonLoadDataCommand( - databaseNameOp = Some(relation.carbonRelation.databaseName), - tableName = relation.carbonRelation.tableName, - factPathFromUser = null, - dimFilesPath = Seq(), - options = scala.collection.immutable.Map("fileheader" -> header), - isOverwriteTable = overwrite, - inputSqlString = null, - dataFrame = Some(df), - updateModel = None, - tableInfoOp = None, - internalOptions = Map.empty, - partition = partition) - val load = loadCommand.processMetadata(sparkSession) - if (isPersistRequired) { - df.unpersist() - } - load + val partitionStatus = SegmentStatus.SUCCESS + val loadParams = CarbonLoadParams(sparkSession, + tableName, + sizeInBytes, + isOverwriteTable, + carbonLoadModel, + hadoopConf, + logicalPartitionRelation, + dateFormat, + timeStampFormat, + options, + finalPartition, + currPartitions, + partitionStatus, + None, + Some(scanResultRdd), + None, + operationContext) + LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope) + val (rows, loadResult) = insertData(loadParams) + val info = CommonLoadUtils.makeAuditInfo(loadResult) + setAuditInfo(info) + CommonLoadUtils.firePostLoadEvents(sparkSession, + carbonLoadModel, + tableDataMaps, + dataMapOperationContext, + table, + operationContext) + } catch { + case CausedBy(ex: NoRetryException) => + // update the load entry in table status file for changing the status to marked for delete + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) + } + LOGGER.error(s"Dataload failure for $dbName.$tableName", ex) + throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") + // In case of event related exception + case preEventEx: PreEventException => + LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx) + throw new AnalysisException(preEventEx.getMessage) + case ex: Exception => + LOGGER.error(ex) + // update the load entry in table status file for changing the status to marked for delete + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) + } + throw ex + } + Seq.empty } - override def processData(sparkSession: SparkSession): Seq[Row] = { - if (null != loadCommand) { - val rows = loadCommand.processData(sparkSession) - setAuditInfo(loadCommand.auditInfo) - rows + + def getReArrangedLogicalPlan( + reArrangedIndex: Seq[Int], + selectedColumnSchema: Seq[ColumnSchema], + convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = { + var processedProject: Boolean = false + // check first node is the projection or not + logicalPlan match { + case _: Project => + // project is already present as first node + case _ => + // If project is not present, add the projection to re-arrange it + logicalPlan = Project(logicalPlan.output, logicalPlan) + } + // Re-arrange the project as per columnSchema + val newLogicalPlan = logicalPlan.transformDown { + // case logicalRelation: LogicalRelation => + // getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation) + // case hiveRelation: HiveTableRelation => + // getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation) + case p: Project => + var oldProjectionList = p.projectList + if (!processedProject) { Review comment: move it to the case code ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378722658 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -358,19 +359,41 @@ object CarbonDataRDDFactory { } } } else { - status = if (dataFrame.isEmpty && isSortTable && - carbonLoadModel.getRangePartitionColumn != null && - (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) || - sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) { - DataLoadProcessBuilderOnSpark - .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + status = if (scanResultRdd.isDefined) { Review comment: now the if code block becomes bigger, better to cut some branches. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |