[GitHub] [carbondata] ajantha-bhat opened a new pull request #3538: [WIP] Separate Insert and load to later optimize insert.

classic Classic list List threaded Threaded
232 messages Options
1 ... 56789101112
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376349173
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
 
 Review comment:
   it is valid comment, I will make it more understandable.
   
   `setting a converter will remove the thread local entries of previous load configurations.`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376349852
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376351144
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376351172
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp,
+        tableName,
+        options,
+        isOverwriteTable,
+        dimFilesPath,
+        dataFrame,
+        inputSqlString,
+        None,
+        tableInfoOp,
+        internalOptions,
+        partition).process(sparkSession)
+    }
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf,
+      factPath,
+      optionsFinal, parentTablePath, table, isDataFrame = true, internalOptions, partition, options)
+
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
+      carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+
+    var complexChildCount: Int = 0
+    var reArrangedIndex: Seq[Int] = Seq()
+    var selectedColumnSchema: Seq[ColumnSchema] = Seq()
+    var partitionIndex: Seq[Int] = Seq()
+
+    val columnSchema = tableInfoOp.get.getFactTable.getListOfColumns.asScala
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = mutable.Map[String, AnyRef]()
+    // Remove the thread local entries of previous configurations.
+    DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+    if (partition.nonEmpty) {
+      for (col <- partitionColumnSchema) {
+        if (partition(col.getColumnName.toLowerCase).isDefined) {
+          convertedStaticPartition(col.getColumnName.toLowerCase) =
+            CarbonScalaUtil.convertStaticPartitionToValues(partition(col.getColumnName.toLowerCase)
+              .get,
+              SparkDataTypeConverterImpl.convertCarbonToSparkDataType(col.getDataType),
+              timeStampFormat,
+              dateFormat)
+        }
+      }
+    }
+    val partitionColumnNames = if (partitionColumnSchema != null) {
+      partitionColumnSchema.map(x => x.getColumnName).toSet
+    } else {
+      null
+    }
+    // get invisible column indexes, alter table scenarios can have it before or after new column
+    // dummy measure will have ordinal -1 and it is invisible, ignore that column.
+    // alter table old columns are just invisible columns with proper ordinal
+    val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1)
+      .map(col => col.getSchemaOrdinal)
+    columnSchema.filterNot(col => col.isInvisible).foreach {
+      col =>
+        var skipPartitionColumn = false
+        if (col.getColumnName.contains(".")) {
+          // If the schema ordinal is -1,
+          // no need to consider it during shifting columns to derive new shifted ordinal
+          if (col.getSchemaOrdinal != -1) {
+            complexChildCount = complexChildCount + 1
+          }
+        } else {
+          // get number of invisible index count before this column
+          val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal)
+          if (col.getDataType.isComplexType) {
+            // Calculate re-arrange index by ignoring the complex child count.
+            // As projection will have only parent columns
+            reArrangedIndex = reArrangedIndex :+
+                              (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount)
+          } else {
+            if (partitionColumnNames != null && partitionColumnNames.contains(col.getColumnName)) {
+              partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+              skipPartitionColumn = true
+            } else {
+              reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount)
+            }
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
+        }
+    }
+    if (partitionColumnSchema != null) {
+      // keep partition columns in the end
+      selectedColumnSchema = selectedColumnSchema ++ partitionColumnSchema
+    }
+    if (partitionIndex.nonEmpty) {
+      // keep partition columns in the end and in the original create order
+      reArrangedIndex = reArrangedIndex ++ partitionIndex.sortBy(x => x)
+    }
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+        // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
+          if (partition.nonEmpty) {
+            // partition keyword is present in insert and
+            // select query partition projections may not be same as create order.
+            // So, bring to create table order
+            val dynamicPartition = partition.filterNot(entry => entry._2.isDefined)
+            var index = 0
+            val map = mutable.Map[String, Int]()
+            for (part <- dynamicPartition) {
+              map(part._1) = index
+              index = index + 1
+            }
+            var tempList = oldProjectionList.take(oldProjectionList.size - dynamicPartition.size)
+            val partitionList = oldProjectionList.takeRight(dynamicPartition.size)
+            val partitionSchema = table.getPartitionInfo.getColumnSchemaList.asScala
+            for (partitionCol <- partitionSchema) {
+              if (map.get(partitionCol.getColumnName).isDefined) {
+                tempList = tempList :+ partitionList(map(partitionCol.getColumnName))
+              }
+            }
+            oldProjectionList = tempList
+          }
+          if (reArrangedIndex.size != oldProjectionList.size) {
+            // for non-partition table columns must match
+            if (partition.isEmpty) {
+              throw new AnalysisException(
+                s"Cannot insert into table $tableName because the number of columns are " +
+                s"different: " +
+                s"need ${ reArrangedIndex.size } columns, " +
+                s"but query has ${ oldProjectionList.size } columns.")
+            } else {
+              if (reArrangedIndex.size - oldProjectionList.size != convertedStaticPartition.size) {
+                throw new AnalysisException(
+                  s"Cannot insert into table $tableName because the number of columns are " +
+                  s"different: need ${ reArrangedIndex.size } columns, " +
+                  s"but query has ${ oldProjectionList.size } columns.")
+              } else {
+                // TODO: For partition case, remaining projections need to validate ?
+              }
+            }
+          }
+          var newProjectionList: Seq[NamedExpression] = Seq.empty
+          var i = 0
+          while (i < reArrangedIndex.size) {
+            // column schema is already has sortColumns-dimensions-measures. Collect the ordinal &
+            // re-arrange the projection in the same order
+            if (partition.nonEmpty &&
+                convertedStaticPartition.contains(selectedColumnSchema(i).getColumnName
+                  .toLowerCase())) {
+              // If column schema present in partitionSchema means it is a static partition,
+              // then add a value literal expression in the project.
+              val value = convertedStaticPartition(selectedColumnSchema(i).getColumnName
+                .toLowerCase())
+              newProjectionList = newProjectionList :+
+                                  Alias(new Literal(value,
+                                    SparkDataTypeConverterImpl.convertCarbonToSparkDataType(
+                                      selectedColumnSchema(i).getDataType)), value.toString)(
+                                    NamedExpression.newExprId,
+                                    None,
+                                    None).asInstanceOf[NamedExpression]
+            } else {
+              // If column schema NOT present in partition column,
+              // get projection column mapping its ordinal.
+              if (partition.contains(selectedColumnSchema(i).getColumnName.toLowerCase())) {
+                // static partition + dynamic partition case,
+                // here dynamic partition ordinal will be more than projection size
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(
+                                      reArrangedIndex(i) - convertedStaticPartition.size)
+              } else {
+                newProjectionList = newProjectionList :+
+                                    oldProjectionList(reArrangedIndex(i))
+              }
+            }
+            i = i + 1
+          }
+          processedProject = true
+          Project(newProjectionList, p.child)
+        } else {
+          p
+        }
+    }
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedSchemaLogicalRelation(reArrangedIndex: Seq[Int],
+      logicalRelation: LogicalRelation): LogicalRelation = {
+    if (reArrangedIndex.size != logicalRelation.schema.size) {
+      throw new AnalysisException(
+        s"Cannot insert into table $tableName because the number of columns are different: " +
+        s"need ${ reArrangedIndex.size } columns, " +
+        s"but query has ${ logicalRelation.schema.size } columns.")
+    }
+    val reArrangedFields = new Array[StructField](logicalRelation.schema.size)
+    val reArrangedAttributes = new Array[AttributeReference](logicalRelation.schema.size)
+    val fields = logicalRelation.schema.fields
+    val output = logicalRelation.output
+    var i = 0
+    for (index <- reArrangedIndex) {
+      reArrangedFields(i) = fields(index)
+      reArrangedAttributes(i) = output(index)
+      i = i + 1
+    }
+    val catalogTable = logicalRelation.catalogTable
+      .get
+      .copy(schema = new StructType(reArrangedFields))
+    logicalRelation.copy(logicalRelation.relation,
+      reArrangedAttributes,
+      Some(catalogTable))
+  }
+
+  def getReArrangedSchemaHiveRelation(reArrangedIndex: Seq[Int],
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376354706
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadParams.scala
 ##########
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.command.UpdateTableModel
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+/*
+* intermediate object to pass between load functions
+*/
+case class CarbonLoadParams(sparkSession: SparkSession,
 
 Review comment:
   load model will become bulky and it is used in other places also, this is only for this place

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376355884
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 ##########
 @@ -316,16 +315,15 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
     val header = getHeader(carbonRelation, plan)
 
-    CarbonLoadDataCommand(
+    CarbonInsertIntoWithDf(
       Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
       carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
-      null,
-      Seq(),
       Map(("fileheader" -> header)),
       false,
       null,
-      Some(dataFrame),
-      Some(updateTableModel)).run(sparkSession)
+      dataFrame,
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376355932
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
 ##########
 @@ -76,11 +76,16 @@ case class CarbonCreateTableAsSelectCommand(
         .createCarbonDataSourceHadoopRelation(sparkSession,
           TableIdentifier(tableName, Option(dbName)))
       // execute command to load data into carbon table
-      loadCommand = CarbonInsertIntoCommand(
-        carbonDataSourceHadoopRelation,
-        query,
-        overwrite = false,
-        partition = Map.empty)
+      loadCommand = CarbonInsertIntoCommand(Some(carbonDataSourceHadoopRelation
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376356414
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String,
 
   // TODO Implement writesupport interface to support writing Row directly to recordwriter
   def writeCarbon(row: InternalRow): Unit = {
-    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-    var i = 0
-    val fieldTypesLen = fieldTypes.length
-    while (i < fieldTypesLen) {
-      if (!row.isNullAt(i)) {
-        fieldTypes(i) match {
-          case StringType =>
-            data(i) = row.getString(i)
-          case d: DecimalType =>
-            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-          case other =>
-            data(i) = row.get(i, other)
-        }
-      }
-      i += 1
-    }
+    val totalLength = fieldTypes.length + partitionData.length
+    val data: Array[AnyRef] = CommonUtil.getObjectArrayFromInternalRowAndConvertComplexType(row,
+      fieldTypes,
+      totalLength)
     if (partitionData.length > 0) {
-      System.arraycopy(partitionData, 0, data, fieldTypesLen, partitionData.length)
+      System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
 
 Review comment:
   yes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376356520
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##########
 @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String,
 
   // TODO Implement writesupport interface to support writing Row directly to recordwriter
   def writeCarbon(row: InternalRow): Unit = {
-    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-    var i = 0
-    val fieldTypesLen = fieldTypes.length
-    while (i < fieldTypesLen) {
-      if (!row.isNullAt(i)) {
-        fieldTypes(i) match {
-          case StringType =>
-            data(i) = row.getString(i)
-          case d: DecimalType =>
-            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-          case other =>
-            data(i) = row.get(i, other)
-        }
-      }
-      i += 1
-    }
+    val totalLength = fieldTypes.length + partitionData.length
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376357574
 
 

 ##########
 File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##########
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for complex types like Array &
  * Struct
  */
-public interface GenericDataType<T> {
+public interface GenericDataType<T> extends Serializable {
 
 Review comment:
   ok done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376357834
 
 

 ##########
 File path: processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 ##########
 @@ -110,10 +113,10 @@ private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] in
    */
   private AbstractDataLoadProcessorStep buildInternalWithNoConverter(
       CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
-      SortScopeOptions.SortScope sortScope) {
+      SortScopeOptions.SortScope sortScope, boolean withoutReArrange) {
 
 Review comment:
   That is more change, may be refactor in another PR

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376361675
 
 

 ##########
 File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##########
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for complex types like Array &
  * Struct
  */
-public interface GenericDataType<T> {
+public interface GenericDataType<T> extends Serializable {
 
 Review comment:
   Primitive type logger was not user, removed the field itself

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-583398606
 
 
   Build Failed  with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/179/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-583440865
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1882/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376471953
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    var isInsertFromTable = false
+    logicalPlan.collect {
+      case _: LogicalRelation =>
+        isInsertFromTable = true
+    }
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle this in future, this must use new flow.
+    if (!isInsertFromTable || isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
 
 Review comment:
   ok, refactored

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376680696
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##########
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var dataFrame: DataFrame,
+    var inputSqlString: String = null,
+    var updateModel: Option[UpdateTableModel] = None,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
+    ThreadLocalSessionInfo
+      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+    val optionsFinal: util.Map[String, String] =
+      CommonLoadUtils.getFinalLoadOptions(
+      carbonProperty, table, options)
 
 Review comment:
   ok done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-584011058
 
 
   Build Failed  with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/207/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-584037967
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1909/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-584060297
 
 
   Build Failed  with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/210/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376361675
 
 

 ##########
 File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##########
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for complex types like Array &
  * Struct
  */
-public interface GenericDataType<T> {
+public interface GenericDataType<T> extends Serializable {
 
 Review comment:
   Primitive type logger was not used, removed the field itself

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1 ... 56789101112