jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195809 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadParams.scala ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.text.SimpleDateFormat +import java.util + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.command.UpdateTableModel +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.model.CarbonLoadModel + +/* +* intermediate object to pass between load functions +*/ +case class CarbonLoadParams(sparkSession: SparkSession, Review comment: move `sparkSession: SparkSession,` to next line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376196114 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadParams.scala ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.text.SimpleDateFormat +import java.util + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.command.UpdateTableModel +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.model.CarbonLoadModel + +/* +* intermediate object to pass between load functions +*/ +case class CarbonLoadParams(sparkSession: SparkSession, Review comment: Will this be refactored later? We already have `CarbonLoadModel` to pass the parameters ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376196174 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ########## @@ -316,16 +315,15 @@ private[sql] case class CarbonProjectForUpdateCommand( val header = getHeader(carbonRelation, plan) - CarbonLoadDataCommand( + CarbonInsertIntoWithDf( Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName), carbonRelation.identifier.getCarbonTableIdentifier.getTableName, - null, - Seq(), Map(("fileheader" -> header)), false, null, - Some(dataFrame), - Some(updateTableModel)).run(sparkSession) + dataFrame, Review comment: please add parameter names ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376196366 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala ########## @@ -76,11 +76,16 @@ case class CarbonCreateTableAsSelectCommand( .createCarbonDataSourceHadoopRelation(sparkSession, TableIdentifier(tableName, Option(dbName))) // execute command to load data into carbon table - loadCommand = CarbonInsertIntoCommand( - carbonDataSourceHadoopRelation, - query, - overwrite = false, - partition = Map.empty) + loadCommand = CarbonInsertIntoCommand(Some(carbonDataSourceHadoopRelation Review comment: change ``` Some(carbonDataSourceHadoopRelation .carbonRelation .databaseName), ``` to same line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376196613 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String, // TODO Implement writesupport interface to support writing Row directly to recordwriter def writeCarbon(row: InternalRow): Unit = { - val data = new Array[AnyRef](fieldTypes.length + partitionData.length) - var i = 0 - val fieldTypesLen = fieldTypes.length - while (i < fieldTypesLen) { - if (!row.isNullAt(i)) { - fieldTypes(i) match { - case StringType => - data(i) = row.getString(i) - case d: DecimalType => - data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal - case other => - data(i) = row.get(i, other) - } - } - i += 1 - } + val totalLength = fieldTypes.length + partitionData.length Review comment: Is this number of columns? suggest to change to `numColumns` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376196806 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ########## @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String, // TODO Implement writesupport interface to support writing Row directly to recordwriter def writeCarbon(row: InternalRow): Unit = { - val data = new Array[AnyRef](fieldTypes.length + partitionData.length) - var i = 0 - val fieldTypesLen = fieldTypes.length - while (i < fieldTypesLen) { - if (!row.isNullAt(i)) { - fieldTypes(i) match { - case StringType => - data(i) = row.getString(i) - case d: DecimalType => - data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal - case other => - data(i) = row.get(i, other) - } - } - i += 1 - } + val totalLength = fieldTypes.length + partitionData.length + val data: Array[AnyRef] = CommonUtil.getObjectArrayFromInternalRowAndConvertComplexType(row, + fieldTypes, + totalLength) if (partitionData.length > 0) { - System.arraycopy(partitionData, 0, data, fieldTypesLen, partitionData.length) + System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length) Review comment: Is `fieldTypes.length` equal to number of non-partition columns? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376197122 ########## File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java ########## @@ -32,7 +33,7 @@ * Generic DataType interface which will be used while data loading for complex types like Array & * Struct */ -public interface GenericDataType<T> { +public interface GenericDataType<T> extends Serializable { Review comment: There are some logger in subclass, make them transient ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376197271 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ########## @@ -110,10 +113,10 @@ private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] in */ private AbstractDataLoadProcessorStep buildInternalWithNoConverter( CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration, - SortScopeOptions.SortScope sortScope) { + SortScopeOptions.SortScope sortScope, boolean withoutReArrange) { Review comment: can we remove all re-arrange in all flow? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376334978 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ########## @@ -334,6 +356,42 @@ private CarbonRowBatch getBatch() { return newData; } + private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] data, Review comment: I checked , This is different. In original method when ordered data is present it is based on no dictionary mapping, I don't want to use no dictionary mapping. complex type logic is same, may be I will extract a common method for that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376337691 ########## File path: integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java ########## @@ -76,9 +76,21 @@ public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> sp } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) { return DataTypes.createDecimalType(); } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { + if (forGlobalSort) { Review comment: It is required, In MeasureFieldConverterImpl, we convert it to long value in getNoDictionaryValueBasedOnDataType method ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376338167 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ########## @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark { updateLoadStatus(model, partialSuccessAccum) } + def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession, Review comment: ok, I follow this guideline. Auto refactor will keep in the same line. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376339644 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ########## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { + val data = new Array[AnyRef](outputArrayLength) + var i = 0 + val fieldTypesLen = fieldTypes.length + while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { + fieldTypes(i) match { + case StringType => + data(i) = row.getString(i) + case d: DecimalType => + data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => + data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => + data(i) = row.get(i, other) + } + } + i += 1 + } + data + } + + /** + * After converting complex objects to carbon objects, need to convert to byte array + * + * @param row + * @param fields + * @param dataFieldsWithComplexDataType + * @return + */ + def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort( + row: InternalRow, + fields: Seq[StructField], + dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = { Review comment: No, not always. Primitive measure type will be object. This converting internal row to object array. Only in the write step. It will be stored as byte array ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376339989 ########## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ########## @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark { updateLoadStatus(model, partialSuccessAccum) } + def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession, + scanResultRDD : RDD[InternalRow], + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + val originRDD = scanResultRDD + + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator") + val sortStepRowCounter = sc.longAccumulator("Sort Processor Accumulator") + val writeStepRowCounter = sc.longAccumulator("Write Processor Accumulator") + + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val configuration = DataLoadProcessBuilder.createConfiguration(model) + + // 1. Convert internalRow to object array + val fields = Util + .convertToSparkSchemaFromColumnSchema(model.getCarbonDataLoadSchema.getCarbonTable, true) + .fields + .toSeq + + val dataTypes = fields.map(field => field.dataType) + val map: mutable.Map[String, GenericDataType[_]] = mutable.Map[String, GenericDataType[_]]() + CommonUtil.convertComplexDataType(map, configuration) + val rdd = originRDD.map { internalRow => + CommonUtil.getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(internalRow, + fields, + map) + } + + // 2. sort + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + numPartitions = originRDD.partitions.length + } + // Because if the number of partitions greater than 1, there will be action operator + // (sample) in + // sortBy operator. So here we cache the rdd to avoid do input and convert again. + if (numPartitions > 1) { + rdd.persist(StorageLevel.fromString( + CarbonProperties.getInstance().getGlobalSortRddStorageLevel())) + } + val sortColumnsLength = model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size() + val sortColumnDataTypes = dataTypes.take(sortColumnsLength) + val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes) + val sortRDD = rdd.sortBy(x => getKey(x, sortColumnsLength, sortColumnDataTypes), Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376340053 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.text.SimpleDateFormat +import java.util + +import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory + +/* +* insert into with df, doesn't use logical plan +* +*/ +case class CarbonInsertIntoWithDf(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var dataFrame: DataFrame, + var inputSqlString: String = null, + var updateModel: Option[UpdateTableModel] = None, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) { + + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var finalPartition: Map[String, Option[String]] = Map.empty + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + def process(sparkSession: SparkSession): Seq[Row] = { + ThreadLocalSessionInfo + .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) + val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils + .processMetadataCommon( + sparkSession, + databaseNameOp, + tableName, + tableInfoOp, + partition) + this.sizeInBytes = sizeInBytes + this.table = table + this.logicalPartitionRelation = logicalPartitionRelation + this.finalPartition = finalPartition + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + val hadoopConf = sparkSession.sessionState.newHadoopConf() + carbonProperty.addProperty("zookeeper.enable.lock", "false") + val factPath = "" + currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table) + CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty) + val optionsFinal: util.Map[String, String] = + CommonLoadUtils.getFinalLoadOptions( + carbonProperty, table, options) + val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel( + hadoopConf, + factPath, + optionsFinal, + parentTablePath, + table, + isDataFrame = true, + internalOptions, + finalPartition, + options) + val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel( + carbonLoadModel) + timeStampFormat = tf + dateFormat = df + // Delete stale segment folders that are not in table status but are physically present in + // the Fact folder + LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") + TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + var isUpdateTableStatusRequired = false + val uuid = "" + try { + val (tableDataMaps, dataMapOperationContext) = + CommonLoadUtils.firePreLoadEvents(sparkSession, + carbonLoadModel, + uuid, + table, + isOverwriteTable, + operationContext) + // First system has to partition the data first and then call the load data + LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376341490 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -957,24 +987,36 @@ object CarbonDataRDDFactory { private def loadDataFrame( sqlContext: SQLContext, dataFrame: Option[DataFrame], + scanResultRDD: Option[RDD[InternalRow]], Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376342021 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -957,24 +987,36 @@ object CarbonDataRDDFactory { private def loadDataFrame( sqlContext: SQLContext, dataFrame: Option[DataFrame], + scanResultRDD: Option[RDD[InternalRow]], carbonLoadModel: CarbonLoadModel ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { try { - val rdd = dataFrame.get.rdd - + val rdd = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + scanResultRDD.get + } val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) }.distinct.length val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList( nodeNumOfData, sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray - .distinct) - + val newRdd = + if (dataFrame.isDefined) { + new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray + .distinct) + } else { + new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession, Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376343994 ########## File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -957,24 +987,36 @@ object CarbonDataRDDFactory { private def loadDataFrame( sqlContext: SQLContext, dataFrame: Option[DataFrame], + scanResultRDD: Option[RDD[InternalRow]], carbonLoadModel: CarbonLoadModel ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { try { - val rdd = dataFrame.get.rdd - + val rdd = if (dataFrame.isDefined) { + dataFrame.get.rdd + } else { + scanResultRDD.get + } val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) }.distinct.length val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList( nodeNumOfData, sqlContext.sparkContext) - val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray - .distinct) - + val newRdd = + if (dataFrame.isDefined) { + new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray + .distinct) + } else { + new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession, + scanResultRDD.get, + nodes.toArray + .distinct) Review comment: yes, this flow is for local sort and no sort. Have plan to send no sort to global sort plan. local sort, one task one node. Hence this logic ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376346935 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,518 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var inputSqlString: String = null, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { - var loadCommand: CarbonLoadDataCommand = _ + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var scanResultRdd: RDD[InternalRow] = _ + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + var finalPartition: Map[String, Option[String]] = Map.empty + + var isInsertIntoWithConverterFlow: Boolean = false + + var dataFrame: DataFrame = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def containsLimit(plan: LogicalPlan): Boolean = { - plan find { - case limit: GlobalLimit => true - case other => false - } isDefined + if (!tableInfoOp.isDefined) { + throw new RuntimeException(" table info must be present when logical relation exist") } - + // If logical plan is unresolved, need to convert it to resolved. + dataFrame = Dataset.ofRows(sparkSession, logicalPlan) + logicalPlan = dataFrame.queryExecution.analyzed + var isInsertFromTable = false + logicalPlan.collect { + case _: LogicalRelation => + isInsertFromTable = true + } + // Currently projection re-ordering is based on schema ordinal, + // for some scenarios in alter table scenario, schema ordinal logic cannot be applied. + // So, sending it to old flow + // TODO: Handle this in future, this must use new flow. + if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) { + isInsertIntoWithConverterFlow = true + } + if (isInsertIntoWithConverterFlow) { + return Seq.empty + } + setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName) ThreadLocalSessionInfo .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, - CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) - val isPersistRequired = - isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child) - val df = - if (isPersistRequired) { - LOGGER.info("Persist enabled for Insert operation") - Dataset.ofRows(sparkSession, child).persist( - StorageLevel.fromString( - CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel)) + val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils + .processMetadataCommon( + sparkSession, + databaseNameOp, + tableName, + tableInfoOp, + partition) + this.sizeInBytes = sizeInBytes + this.table = table + this.logicalPartitionRelation = logicalPartitionRelation + this.finalPartition = finalPartition + setAuditTable(dbName, tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (isInsertIntoWithConverterFlow) { + return CarbonInsertIntoWithDf( + databaseNameOp, Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376347781 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,518 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var inputSqlString: String = null, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { - var loadCommand: CarbonLoadDataCommand = _ + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var scanResultRdd: RDD[InternalRow] = _ + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + var finalPartition: Map[String, Option[String]] = Map.empty + + var isInsertIntoWithConverterFlow: Boolean = false + + var dataFrame: DataFrame = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def containsLimit(plan: LogicalPlan): Boolean = { - plan find { - case limit: GlobalLimit => true - case other => false - } isDefined + if (!tableInfoOp.isDefined) { + throw new RuntimeException(" table info must be present when logical relation exist") } - + // If logical plan is unresolved, need to convert it to resolved. + dataFrame = Dataset.ofRows(sparkSession, logicalPlan) + logicalPlan = dataFrame.queryExecution.analyzed + var isInsertFromTable = false + logicalPlan.collect { + case _: LogicalRelation => + isInsertFromTable = true + } + // Currently projection re-ordering is based on schema ordinal, + // for some scenarios in alter table scenario, schema ordinal logic cannot be applied. + // So, sending it to old flow + // TODO: Handle this in future, this must use new flow. + if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) { + isInsertIntoWithConverterFlow = true + } + if (isInsertIntoWithConverterFlow) { + return Seq.empty + } + setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName) ThreadLocalSessionInfo .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, - CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) - val isPersistRequired = - isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child) - val df = - if (isPersistRequired) { - LOGGER.info("Persist enabled for Insert operation") - Dataset.ofRows(sparkSession, child).persist( - StorageLevel.fromString( - CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel)) + val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils + .processMetadataCommon( + sparkSession, + databaseNameOp, + tableName, + tableInfoOp, + partition) + this.sizeInBytes = sizeInBytes + this.table = table + this.logicalPartitionRelation = logicalPartitionRelation + this.finalPartition = finalPartition + setAuditTable(dbName, tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (isInsertIntoWithConverterFlow) { + return CarbonInsertIntoWithDf( + databaseNameOp, + tableName, + options, + isOverwriteTable, + dimFilesPath, + dataFrame, + inputSqlString, + None, + tableInfoOp, + internalOptions, + partition).process(sparkSession) + } + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + carbonProperty.addProperty("zookeeper.enable.lock", "false") + val factPath = "" + currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table) + CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty) + val optionsFinal: util.Map[String, String] = + CommonLoadUtils.getFinalLoadOptions( + carbonProperty, table, options) + val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel( Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376348551 ########## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ########## @@ -17,85 +17,518 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean, - partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], + tableName: String, + options: Map[String, String], + isOverwriteTable: Boolean, + var logicalPlan: LogicalPlan, + var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), + var inputSqlString: String = null, + var tableInfoOp: Option[TableInfo] = None, + var internalOptions: Map[String, String] = Map.empty, + var partition: Map[String, Option[String]] = Map.empty, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { - var loadCommand: CarbonLoadDataCommand = _ + var table: CarbonTable = _ + + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + + var currPartitions: util.List[PartitionSpec] = _ + + var parentTablePath: String = _ + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + var scanResultRdd: RDD[InternalRow] = _ + + var timeStampFormat: SimpleDateFormat = _ + + var dateFormat: SimpleDateFormat = _ + + var finalPartition: Map[String, Option[String]] = Map.empty + + var isInsertIntoWithConverterFlow: Boolean = false + + var dataFrame: DataFrame = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def containsLimit(plan: LogicalPlan): Boolean = { - plan find { - case limit: GlobalLimit => true - case other => false - } isDefined + if (!tableInfoOp.isDefined) { + throw new RuntimeException(" table info must be present when logical relation exist") } - + // If logical plan is unresolved, need to convert it to resolved. + dataFrame = Dataset.ofRows(sparkSession, logicalPlan) + logicalPlan = dataFrame.queryExecution.analyzed + var isInsertFromTable = false + logicalPlan.collect { + case _: LogicalRelation => + isInsertFromTable = true + } + // Currently projection re-ordering is based on schema ordinal, + // for some scenarios in alter table scenario, schema ordinal logic cannot be applied. + // So, sending it to old flow + // TODO: Handle this in future, this must use new flow. + if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) { + isInsertIntoWithConverterFlow = true + } + if (isInsertIntoWithConverterFlow) { + return Seq.empty + } + setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName) ThreadLocalSessionInfo .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED, - CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT) - val isPersistRequired = - isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child) - val df = - if (isPersistRequired) { - LOGGER.info("Persist enabled for Insert operation") - Dataset.ofRows(sparkSession, child).persist( - StorageLevel.fromString( - CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel)) + val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils + .processMetadataCommon( + sparkSession, + databaseNameOp, + tableName, + tableInfoOp, + partition) + this.sizeInBytes = sizeInBytes + this.table = table + this.logicalPartitionRelation = logicalPartitionRelation + this.finalPartition = finalPartition + setAuditTable(dbName, tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (isInsertIntoWithConverterFlow) { + return CarbonInsertIntoWithDf( + databaseNameOp, + tableName, + options, + isOverwriteTable, + dimFilesPath, + dataFrame, + inputSqlString, + None, + tableInfoOp, + internalOptions, + partition).process(sparkSession) + } + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + carbonProperty.addProperty("zookeeper.enable.lock", "false") + val factPath = "" + currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table) + CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty) + val optionsFinal: util.Map[String, String] = + CommonLoadUtils.getFinalLoadOptions( + carbonProperty, table, options) + val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel( + hadoopConf, + factPath, + optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options) + + val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel( + carbonLoadModel) Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |