[GitHub] [carbondata] ajantha-bhat opened a new pull request #3538: [WIP] Separate Insert and load to later optimize insert.

classic Classic list List threaded Threaded
232 messages Options
1 ... 3456789 ... 12
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376191565
 
 

 ##########
 File path: integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
 ##########
 @@ -76,9 +76,21 @@ public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> sp
     } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
       return DataTypes.createDecimalType();
     } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+      if (forGlobalSort) {
 
 Review comment:
   Timestamp is not direct dictionary, is this needed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376191797
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ##########
 @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark {
     updateLoadStatus(model, partialSuccessAccum)
   }
 
+  def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession,
 
 Review comment:
   move `sparkSession: SparkSession,` to next line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376192708
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ##########
 @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark {
     updateLoadStatus(model, partialSuccessAccum)
   }
 
+  def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession,
+      scanResultRDD : RDD[InternalRow],
+      model: CarbonLoadModel,
+      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    val originRDD = scanResultRDD
+
+    val sc = sparkSession.sparkContext
+    val modelBroadcast = sc.broadcast(model)
+    val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator")
+    val sortStepRowCounter = sc.longAccumulator("Sort Processor Accumulator")
+    val writeStepRowCounter = sc.longAccumulator("Write Processor Accumulator")
+
+    hadoopConf
+      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
+
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+    val configuration = DataLoadProcessBuilder.createConfiguration(model)
+
+    // 1. Convert internalRow to object array
+    val fields = Util
+      .convertToSparkSchemaFromColumnSchema(model.getCarbonDataLoadSchema.getCarbonTable, true)
+      .fields
+      .toSeq
+
+    val dataTypes = fields.map(field => field.dataType)
+    val map: mutable.Map[String, GenericDataType[_]] = mutable.Map[String, GenericDataType[_]]()
+    CommonUtil.convertComplexDataType(map, configuration)
+    val rdd = originRDD.map { internalRow =>
+      CommonUtil.getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(internalRow,
+        fields,
+        map)
+    }
+
+    // 2. sort
+    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
+    if (numPartitions <= 0) {
+      numPartitions = originRDD.partitions.length
+    }
+    // Because if the number of partitions greater than 1, there will be action operator
+    // (sample) in
+    // sortBy operator. So here we cache the rdd to avoid do input and convert again.
+    if (numPartitions > 1) {
+      rdd.persist(StorageLevel.fromString(
+        CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
+    }
+    val sortColumnsLength = model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size()
+    val sortColumnDataTypes = dataTypes.take(sortColumnsLength)
+    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes)
+    val sortRDD = rdd.sortBy(x => getKey(x, sortColumnsLength, sortColumnDataTypes),
 
 Review comment:
   move `getKey(x, sortColumnsLength, sortColumnDataTypes)` to next line
   use pattern like ```
   rdd.sortBy { row =>
     ...
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376192708
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ##########
 @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark {
     updateLoadStatus(model, partialSuccessAccum)
   }
 
+  def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession,
+      scanResultRDD : RDD[InternalRow],
+      model: CarbonLoadModel,
+      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    val originRDD = scanResultRDD
+
+    val sc = sparkSession.sparkContext
+    val modelBroadcast = sc.broadcast(model)
+    val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator")
+    val sortStepRowCounter = sc.longAccumulator("Sort Processor Accumulator")
+    val writeStepRowCounter = sc.longAccumulator("Write Processor Accumulator")
+
+    hadoopConf
+      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
+
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+    val configuration = DataLoadProcessBuilder.createConfiguration(model)
+
+    // 1. Convert internalRow to object array
+    val fields = Util
+      .convertToSparkSchemaFromColumnSchema(model.getCarbonDataLoadSchema.getCarbonTable, true)
+      .fields
+      .toSeq
+
+    val dataTypes = fields.map(field => field.dataType)
+    val map: mutable.Map[String, GenericDataType[_]] = mutable.Map[String, GenericDataType[_]]()
+    CommonUtil.convertComplexDataType(map, configuration)
+    val rdd = originRDD.map { internalRow =>
+      CommonUtil.getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(internalRow,
+        fields,
+        map)
+    }
+
+    // 2. sort
+    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+      configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
+    if (numPartitions <= 0) {
+      numPartitions = originRDD.partitions.length
+    }
+    // Because if the number of partitions greater than 1, there will be action operator
+    // (sample) in
+    // sortBy operator. So here we cache the rdd to avoid do input and convert again.
+    if (numPartitions > 1) {
+      rdd.persist(StorageLevel.fromString(
+        CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
+    }
+    val sortColumnsLength = model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size()
+    val sortColumnDataTypes = dataTypes.take(sortColumnsLength)
+    val rowComparator = GlobalSortHelper.generateRowComparator(sortColumnDataTypes)
+    val sortRDD = rdd.sortBy(x => getKey(x, sortColumnsLength, sortColumnDataTypes),
 
 Review comment:
   move `getKey(x, sortColumnsLength, sortColumnDataTypes)` to next line
   
   use pattern like
   ```
   rdd.sortBy { row =>
     ...
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376193657
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##########
 @@ -834,4 +843,179 @@ object CommonUtil {
     displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+      fieldTypes: Seq[DataType],
+      outputArrayLength: Int): Array[AnyRef] = {
+    val data = new Array[AnyRef](outputArrayLength)
+    var i = 0
+    val fieldTypesLen = fieldTypes.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType)
+          case structType : StructType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+              structType.fields.length), structType)
+          case mapType : MapType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType)
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+      row: InternalRow,
+      fields: Seq[StructField],
+      dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = {
 
 Review comment:
   return type should be Array[Byte]?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376193992
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
 
 Review comment:
   since you modify this function, can you add parameter description in function header

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194051
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
       carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
-      val rdd = dataFrame.get.rdd
-
+      val rdd = if (dataFrame.isDefined) {
+        dataFrame.get.rdd
+      } else {
+        scanResultRDD.get
+      }
       val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
         DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
       }.distinct.length
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
-        .distinct)
-
+      val newRdd =
+        if (dataFrame.isDefined) {
+          new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray
+            .distinct)
+        } else {
+          new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession,
 
 Review comment:
   move`sqlContext.sparkSession,` to next line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194067
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
       carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
-      val rdd = dataFrame.get.rdd
-
+      val rdd = if (dataFrame.isDefined) {
+        dataFrame.get.rdd
+      } else {
+        scanResultRDD.get
+      }
       val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
         DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
       }.distinct.length
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
-        .distinct)
-
+      val newRdd =
+        if (dataFrame.isDefined) {
+          new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray
+            .distinct)
+        } else {
+          new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession,
+            scanResultRDD.get,
+            nodes.toArray
+              .distinct)
 
 Review comment:
   move to previous line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194251
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
       carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
-      val rdd = dataFrame.get.rdd
-
+      val rdd = if (dataFrame.isDefined) {
+        dataFrame.get.rdd
+      } else {
+        scanResultRDD.get
+      }
       val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
         DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
       }.distinct.length
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
-        .distinct)
-
+      val newRdd =
+        if (dataFrame.isDefined) {
+          new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray
+            .distinct)
+        } else {
+          new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession,
+            scanResultRDD.get,
+            nodes.toArray
+              .distinct)
 
 Review comment:
   Is this for local sort? why distinct of the nodes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194470
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
 
 Review comment:
   since this function has many parameter, add parameter names like `databaseNameOp = databaseNameOp`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194652
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
 
 Review comment:
   format the parameter list

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194673
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
 
 Review comment:
   move to previous line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194756
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
 
 Review comment:
   seems a invalid comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376194827
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
 
 Review comment:
   move `partition(col.getColumnName.toLowerCase)` to next line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195096
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
 
 Review comment:
   this function is too big, can you separate the logic into some smaller but manageable functions

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195210
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
+      logicalRelation: LogicalRelation): LogicalRelation = {
+    if (reArrangedIndex.size != logicalRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ logicalRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](logicalRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](logicalRelation.schema.size)
+    val fields = logicalRelation.schema.fields
+    val output = logicalRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = logicalRelation.catalogTable
+      .get
+      .copy(schema = new StructType(reArrangedFields))
+    logicalRelation.copy(logicalRelation.relation,
+      reArrangedAttributes,
+      Some(catalogTable))
+  }
+
+  def getReArrangedSchemaHiveRelation(reArrangedIndex: Seq[Int],
 
 Review comment:
   add comment to explain the logic

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195242
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
 
 Review comment:
   add comment to explain the logic

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195414
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
+      logicalRelation: LogicalRelation): LogicalRelation = {
+    if (reArrangedIndex.size != logicalRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ logicalRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](logicalRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](logicalRelation.schema.size)
+    val fields = logicalRelation.schema.fields
+    val output = logicalRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = logicalRelation.catalogTable
+      .get
+      .copy(schema = new StructType(reArrangedFields))
+    logicalRelation.copy(logicalRelation.relation,
+      reArrangedAttributes,
+      Some(catalogTable))
+  }
+
+  def getReArrangedSchemaHiveRelation(reArrangedIndex: Seq[Int],
+      hiveTableRelation: HiveTableRelation): HiveTableRelation = {
+    if (reArrangedIndex.size != hiveTableRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ hiveTableRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](hiveTableRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](hiveTableRelation.schema.size)
+    val fields = hiveTableRelation.schema.fields
+    val output = hiveTableRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = hiveTableRelation.tableMeta.copy(schema = new StructType(reArrangedFields))
+    hiveTableRelation.copy(catalogTable,
+      reArrangedAttributes)
+  }
+
+  def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
+    var rows = Seq.empty[Row]
+    val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    var loadResult : LoadMetadataDetails = null
+    if (table.isHivePartitionTable) {
+      rows = CommonLoadUtils.loadDataWithPartition(loadParams)
     } else {
-      Seq.empty
+      loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext,
+        loadParams.carbonLoadModel,
+        loadParams.partitionStatus,
+        isOverwriteTable,
+        loadParams.hadoopConf,
+        None,
+        loadParams.scanResultRDD,
+        None,
+        operationContext)
     }
+    (rows, loadResult)
   }
 
   override protected def opName: String = {
-    if (overwrite) {
+    if (isOverwriteTable) {
       "INSERT OVERWRITE"
     } else {
       "INSERT INTO"
     }
   }
+
+  private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
+    if (tableInfoOp.get.getFactTable.getSchemaEvolution != null) {
+      for (entry: SchemaEvolutionEntry <- tableInfoOp
 
 Review comment:
   do not use `for` statement, use tableInfoOp....foreach

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195414
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
+      logicalRelation: LogicalRelation): LogicalRelation = {
+    if (reArrangedIndex.size != logicalRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ logicalRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](logicalRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](logicalRelation.schema.size)
+    val fields = logicalRelation.schema.fields
+    val output = logicalRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = logicalRelation.catalogTable
+      .get
+      .copy(schema = new StructType(reArrangedFields))
+    logicalRelation.copy(logicalRelation.relation,
+      reArrangedAttributes,
+      Some(catalogTable))
+  }
+
+  def getReArrangedSchemaHiveRelation(reArrangedIndex: Seq[Int],
+      hiveTableRelation: HiveTableRelation): HiveTableRelation = {
+    if (reArrangedIndex.size != hiveTableRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ hiveTableRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](hiveTableRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](hiveTableRelation.schema.size)
+    val fields = hiveTableRelation.schema.fields
+    val output = hiveTableRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = hiveTableRelation.tableMeta.copy(schema = new StructType(reArrangedFields))
+    hiveTableRelation.copy(catalogTable,
+      reArrangedAttributes)
+  }
+
+  def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
+    var rows = Seq.empty[Row]
+    val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    var loadResult : LoadMetadataDetails = null
+    if (table.isHivePartitionTable) {
+      rows = CommonLoadUtils.loadDataWithPartition(loadParams)
     } else {
-      Seq.empty
+      loadResult = CarbonDataRDDFactory.loadCarbonData(loadParams.sparkSession.sqlContext,
+        loadParams.carbonLoadModel,
+        loadParams.partitionStatus,
+        isOverwriteTable,
+        loadParams.hadoopConf,
+        None,
+        loadParams.scanResultRDD,
+        None,
+        operationContext)
     }
+    (rows, loadResult)
   }
 
   override protected def opName: String = {
-    if (overwrite) {
+    if (isOverwriteTable) {
       "INSERT OVERWRITE"
     } else {
       "INSERT INTO"
     }
   }
+
+  private def isAlteredSchema(tableSchema: TableSchema): Boolean = {
+    if (tableInfoOp.get.getFactTable.getSchemaEvolution != null) {
+      for (entry: SchemaEvolutionEntry <- tableInfoOp
 
 Review comment:
   do not use `for` statement, use tableInfoOp....exist

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376195656
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##########
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var dataFrame: DataFrame,
+    var inputSqlString: String = null,
+    var updateModel: Option[UpdateTableModel] = None,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
 
 Review comment:
   this function is too big, separate the logic into smaller and manageable functions

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1 ... 3456789 ... 12