[GitHub] [carbondata] ajantha-bhat opened a new pull request #3538: [WIP] Separate Insert and load to later optimize insert.

classic Classic list List threaded Threaded
232 messages Options
1 ... 89101112
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378728741
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle alter table in future, this also must use new flow.
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
+        isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp = databaseNameOp,
+        tableName = tableName,
+        options = options,
+        isOverwriteTable = isOverwriteTable,
+        dimFilesPath = dimFilesPath,
+        dataFrame = dataFrame,
+        inputSqlString = inputSqlString,
+        updateModel = None,
+        tableInfoOp = tableInfoOp,
+        internalOptions = internalOptions,
+        partition = partition).process(sparkSession)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
+    val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf = hadoopConf,
+      factPath = factPath,
+      optionsFinal = optionsFinal,
+      parentTablePath = parentTablePath,
+      table = table,
+      isDataFrame = true,
+      internalOptions = internalOptions,
+      partition = partition,
+      options = options)
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
+    val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(
+      tableInfoOp.get,
+      partitionColumnSchema)
+    val newLogicalPlan = getReArrangedLogicalPlan(
+      reArrangedIndex,
+      selectedColumnSchema,
+      convertedStaticPartition)
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedLogicalPlan(
+      reArrangedIndex: Seq[Int],
+      selectedColumnSchema: Seq[ColumnSchema],
+      convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+      // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
 
 Review comment:
   case p: Project if ! processedProject =>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
QiangCai commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378722658
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -358,19 +359,41 @@ object CarbonDataRDDFactory {
             }
           }
         } else {
-          status = if (dataFrame.isEmpty && isSortTable &&
-                       carbonLoadModel.getRangePartitionColumn != null &&
-                       (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
-                        sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
-            DataLoadProcessBuilderOnSpark
-              .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf)
-          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
-              dataFrame, carbonLoadModel, hadoopConf)
-          } else if (dataFrame.isDefined) {
-            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
+          status = if (scanResultRdd.isDefined) {
 
 Review comment:
   now the if code block becomes bigger,  better to cut some branches in the future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378751755
 
 

 ##########
 File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 ##########
 @@ -150,10 +150,14 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     sql("drop table if exists load32000chardata")
     sql("drop table if exists load32000chardata_dup")
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
-    sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
+    sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS " +
+      "carbondata tblproperties('local_dictionary_enable'='false')")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    // Previously converter step was checking more than 32k length and throwing exception.
+    // Now, due to local dictionary is true. Insert will not fail.
+    // when local dictionary is false, insert will fail at write step
     intercept[Exception] {
-      sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
+      sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,load32000chardata.dim2),mes1 from load32000chardata").show()
 
 Review comment:
   In new flow no converter step doesn't check string is more than 32k or not now. Write step didn't fail because after compression it is less than 32k.  so I add 32k + 32k. so after compression it will still fail.
   
   I can set bad record handle and make it same as old test case also

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378754084
 
 

 ##########
 File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 ##########
 @@ -150,10 +150,14 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     sql("drop table if exists load32000chardata")
     sql("drop table if exists load32000chardata_dup")
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
-    sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
+    sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS " +
+      "carbondata tblproperties('local_dictionary_enable'='false')")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    // Previously converter step was checking more than 32k length and throwing exception.
+    // Now, due to local dictionary is true. Insert will not fail.
+    // when local dictionary is false, insert will fail at write step
     intercept[Exception] {
-      sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
+      sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,load32000chardata.dim2),mes1 from load32000chardata").show()
 
 Review comment:
   I set bad record handle and make it same as old test case. Else nobody else will understand :)
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771038
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##########
 @@ -834,4 +843,179 @@ object CommonUtil {
     displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+      fieldTypes: Seq[DataType],
+      outputArrayLength: Int): Array[AnyRef] = {
+    val data = new Array[AnyRef](outputArrayLength)
+    var i = 0
+    val fieldTypesLen = fieldTypes.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType)
+          case structType : StructType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+              structType.fields.length), structType)
+          case mapType : MapType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType)
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+      row: InternalRow,
+      fields: Seq[StructField],
+      dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = {
+    val data = new Array[AnyRef](fields.size)
+    val badRecordLogHolder = new BadRecordLogHolder();
+    var i = 0
+    val fieldTypesLen = fields.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fields(i).dataType match {
+          case StringType =>
+            data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+              DataTypes.STRING)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+              .writeByteArray(result.asInstanceOf[ArrayObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   both are different asInstanceOf[], so method will have if, else again. not useful here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771218
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##########
 @@ -834,4 +843,179 @@ object CommonUtil {
     displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+      fieldTypes: Seq[DataType],
+      outputArrayLength: Int): Array[AnyRef] = {
+    val data = new Array[AnyRef](outputArrayLength)
+    var i = 0
+    val fieldTypesLen = fieldTypes.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType)
+          case structType : StructType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+              structType.fields.length), structType)
+          case mapType : MapType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType)
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+      row: InternalRow,
+      fields: Seq[StructField],
+      dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = {
+    val data = new Array[AnyRef](fields.size)
+    val badRecordLogHolder = new BadRecordLogHolder();
+    var i = 0
+    val fieldTypesLen = fields.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fields(i).dataType match {
+          case StringType =>
+            data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+              DataTypes.STRING)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+              .writeByteArray(result.asInstanceOf[ArrayObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+          case structType : StructType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+              .writeByteArray(result.asInstanceOf[StructObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   same as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771252
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##########
 @@ -834,4 +843,179 @@ object CommonUtil {
     displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+      fieldTypes: Seq[DataType],
+      outputArrayLength: Int): Array[AnyRef] = {
+    val data = new Array[AnyRef](outputArrayLength)
+    var i = 0
+    val fieldTypesLen = fieldTypes.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType)
+          case structType : StructType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+              structType.fields.length), structType)
+          case mapType : MapType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType)
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+      row: InternalRow,
+      fields: Seq[StructField],
+      dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): Array[AnyRef] = {
+    val data = new Array[AnyRef](fields.size)
+    val badRecordLogHolder = new BadRecordLogHolder();
+    var i = 0
+    val fieldTypesLen = fields.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fields(i).dataType match {
+          case StringType =>
+            data(i) = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+              DataTypes.STRING)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, arrayType), arrayType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+              .writeByteArray(result.asInstanceOf[ArrayObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+          case structType : StructType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, structType), structType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+              .writeByteArray(result.asInstanceOf[StructObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+          case mapType : MapType =>
+            val result = convertSparkComplexTypeToCarbonObject(row.get(i, mapType), mapType)
+            // convert carbon complex object to byte array
+            val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+            val dataOutputStream: DataOutputStream = new DataOutputStream(byteArray)
+            dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+              .writeByteArray(result.asInstanceOf[ArrayObject],
+                dataOutputStream,
+                badRecordLogHolder)
+            dataOutputStream.close()
+            data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   same as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771800
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -358,19 +359,41 @@ object CarbonDataRDDFactory {
             }
           }
         } else {
-          status = if (dataFrame.isEmpty && isSortTable &&
-                       carbonLoadModel.getRangePartitionColumn != null &&
-                       (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
-                        sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
-            DataLoadProcessBuilderOnSpark
-              .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf)
-          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
-              dataFrame, carbonLoadModel, hadoopConf)
-          } else if (dataFrame.isDefined) {
-            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
+          status = if (scanResultRdd.isDefined) {
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800852
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle alter table in future, this also must use new flow.
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
+        isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp = databaseNameOp,
+        tableName = tableName,
+        options = options,
+        isOverwriteTable = isOverwriteTable,
+        dimFilesPath = dimFilesPath,
+        dataFrame = dataFrame,
+        inputSqlString = inputSqlString,
+        updateModel = None,
+        tableInfoOp = tableInfoOp,
+        internalOptions = internalOptions,
+        partition = partition).process(sparkSession)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
+    val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf = hadoopConf,
+      factPath = factPath,
+      optionsFinal = optionsFinal,
+      parentTablePath = parentTablePath,
+      table = table,
+      isDataFrame = true,
+      internalOptions = internalOptions,
+      partition = partition,
+      options = options)
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
+    val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(
+      tableInfoOp.get,
+      partitionColumnSchema)
+    val newLogicalPlan = getReArrangedLogicalPlan(
+      reArrangedIndex,
+      selectedColumnSchema,
+      convertedStaticPartition)
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedLogicalPlan(
+      reArrangedIndex: Seq[Int],
+      selectedColumnSchema: Seq[ColumnSchema],
+      convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+      // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
+        var oldProjectionList = p.projectList
+        if (!processedProject) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800899
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
+      throw new RuntimeException(" table info must be present when logical relation exist")
     }
-
+    // If logical plan is unresolved, need to convert it to resolved.
+    dataFrame = Dataset.ofRows(sparkSession, logicalPlan)
+    logicalPlan = dataFrame.queryExecution.analyzed
+    // Currently projection re-ordering is based on schema ordinal,
+    // for some scenarios in alter table scenario, schema ordinal logic cannot be applied.
+    // So, sending it to old flow
+    // TODO: Handle alter table in future, this also must use new flow.
+    if (CarbonProperties.isBadRecordHandlingEnabledForInsert ||
+        isAlteredSchema(tableInfoOp.get.getFactTable)) {
+      isInsertIntoWithConverterFlow = true
+    }
+    if (isInsertIntoWithConverterFlow) {
+      return Seq.empty
+    }
+    setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName)
     ThreadLocalSessionInfo
       .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
-        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
-    val isPersistRequired =
-      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
-    val df =
-      if (isPersistRequired) {
-        LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child).persist(
-          StorageLevel.fromString(
-            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
+    val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) = CommonLoadUtils
+      .processMetadataCommon(
+        sparkSession,
+        databaseNameOp,
+        tableName,
+        tableInfoOp,
+        partition)
+    this.sizeInBytes = sizeInBytes
+    this.table = table
+    this.logicalPartitionRelation = logicalPartitionRelation
+    this.finalPartition = finalPartition
+    setAuditTable(dbName, tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (isInsertIntoWithConverterFlow) {
+      return CarbonInsertIntoWithDf(
+        databaseNameOp = databaseNameOp,
+        tableName = tableName,
+        options = options,
+        isOverwriteTable = isOverwriteTable,
+        dimFilesPath = dimFilesPath,
+        dataFrame = dataFrame,
+        inputSqlString = inputSqlString,
+        updateModel = None,
+        tableInfoOp = tableInfoOp,
+        internalOptions = internalOptions,
+        partition = partition).process(sparkSession)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val factPath = ""
+    currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+    CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession)
+    val optionsFinal: util.Map[String, String] = CommonLoadUtils.getFinalLoadOptions(table, options)
+    val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+      hadoopConf = hadoopConf,
+      factPath = factPath,
+      optionsFinal = optionsFinal,
+      parentTablePath = parentTablePath,
+      table = table,
+      isDataFrame = true,
+      internalOptions = internalOptions,
+      partition = partition,
+      options = options)
+    val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
+    timeStampFormat = tf
+    dateFormat = df
+    val partitionInfo = tableInfoOp.get.getFactTable.getPartitionInfo
+    val partitionColumnSchema =
+      if (partitionInfo != null && partitionInfo.getColumnSchemaList.size() != 0) {
+        partitionInfo.getColumnSchemaList.asScala
+      } else {
+        null
+      }
+    val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
+    val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(
+      tableInfoOp.get,
+      partitionColumnSchema)
+    val newLogicalPlan = getReArrangedLogicalPlan(
+      reArrangedIndex,
+      selectedColumnSchema,
+      convertedStaticPartition)
+    scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
+    if (logicalPartitionRelation != null) {
+      logicalPartitionRelation =
+        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+    }
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    val uuid = ""
+    try {
+      val (tableDataMaps, dataMapOperationContext) =
+        CommonLoadUtils.firePreLoadEvents(sparkSession,
+          carbonLoadModel,
+          uuid,
+          table,
+          isOverwriteTable,
+          operationContext)
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (!table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath)) {
+          FileFactory.mkdirs(metadataDirectoryPath)
+        }
       } else {
-        Dataset.ofRows(sparkSession, child)
+        carbonLoadModel.setSegmentId(System.nanoTime().toString)
       }
-    val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
-    loadCommand = CarbonLoadDataCommand(
-      databaseNameOp = Some(relation.carbonRelation.databaseName),
-      tableName = relation.carbonRelation.tableName,
-      factPathFromUser = null,
-      dimFilesPath = Seq(),
-      options = scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteTable = overwrite,
-      inputSqlString = null,
-      dataFrame = Some(df),
-      updateModel = None,
-      tableInfoOp = None,
-      internalOptions = Map.empty,
-      partition = partition)
-    val load = loadCommand.processMetadata(sparkSession)
-    if (isPersistRequired) {
-      df.unpersist()
-    }
-    load
+      val partitionStatus = SegmentStatus.SUCCESS
+      val loadParams = CarbonLoadParams(sparkSession,
+        tableName,
+        sizeInBytes,
+        isOverwriteTable,
+        carbonLoadModel,
+        hadoopConf,
+        logicalPartitionRelation,
+        dateFormat,
+        timeStampFormat,
+        options,
+        finalPartition,
+        currPartitions,
+        partitionStatus,
+        None,
+        Some(scanResultRdd),
+        None,
+        operationContext)
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
+      val (rows, loadResult) = insertData(loadParams)
+      val info = CommonLoadUtils.makeAuditInfo(loadResult)
+      setAuditInfo(info)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableDataMaps,
+        dataMapOperationContext,
+        table,
+        operationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
+        }
+        throw ex
+    }
+    Seq.empty
   }
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      val rows = loadCommand.processData(sparkSession)
-      setAuditInfo(loadCommand.auditInfo)
-      rows
+
+  def getReArrangedLogicalPlan(
+      reArrangedIndex: Seq[Int],
+      selectedColumnSchema: Seq[ColumnSchema],
+      convertedStaticPartition: mutable.Map[String, AnyRef]): LogicalPlan = {
+    var processedProject: Boolean = false
+    // check first node is the projection or not
+    logicalPlan match {
+      case _: Project =>
+      // project is already present as first node
+      case _ =>
+        // If project is not present, add the projection to re-arrange it
+        logicalPlan = Project(logicalPlan.output, logicalPlan)
+    }
+    // Re-arrange the project as per columnSchema
+    val newLogicalPlan = logicalPlan.transformDown {
+      //      case logicalRelation: LogicalRelation =>
+      //        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalRelation)
+      //      case hiveRelation: HiveTableRelation =>
+      //        getReArrangedSchemaHiveRelation(reArrangedIndex, hiveRelation)
+      case p: Project =>
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800963
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+    var inputSqlString: String = null,
+    var tableInfoOp: Option[TableInfo] = None,
+    var internalOptions: Map[String, String] = Map.empty,
+    var partition: Map[String, Option[String]] = Map.empty,
+    var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    def containsLimit(plan: LogicalPlan): Boolean = {
-      plan find {
-        case limit: GlobalLimit => true
-        case other => false
-      } isDefined
+    if (!tableInfoOp.isDefined) {
 
 Review comment:
   yes. done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801215
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -953,28 +983,46 @@ object CarbonDataRDDFactory {
 
   /**
    * Execute load process to load from input dataframe
+   *
+   * @param sqlContext sql context
+   * @param dataFrame optional dataframe for insert
+   * @param scanResultRDD optional internal row rdd for direct insert
+   * @param carbonLoadModel load model
+   * @return Return an array that contains all of the elements in NewDataFrameLoaderRDD.
    */
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
       carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
-      val rdd = dataFrame.get.rdd
-
+      val rdd = if (dataFrame.isDefined) {
+        dataFrame.get.rdd
+      } else {
+        scanResultRDD.get
+      }
       val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
         DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
       }.distinct.length
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
-        .distinct)
-
+      val newRdd =
+        if (dataFrame.isDefined) {
+          new DataLoadCoalescedRDD[Row](
+            sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray.distinct)
+        } else {
+          new DataLoadCoalescedRDD[InternalRow](
+            sqlContext.sparkSession,
+            scanResultRDD.get,
+            nodes.toArray.distinct)
+        }
       new NewDataFrameLoaderRDD(
         sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
+        dataFrame.isDefined,
 
 Review comment:
   yes. done like that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801391
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -304,11 +305,11 @@ object CarbonDataRDDFactory {
   def loadCarbonData(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      columnar: Boolean,
       partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
       overwriteTable: Boolean,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None,
+      scanResultRdd : Option[RDD[InternalRow]],
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801434
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ##########
 @@ -263,17 +265,35 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.setPreFetch(false)
 
         val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
-        val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
         val serializer = SparkEnv.get.closureSerializer.newInstance()
         var serializeBytes: Array[Byte] = null
-        while(partitionIterator.hasNext) {
-          val value = partitionIterator.next()
-          if (serializeBytes == null) {
-            serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
+        if (isDataFrame) {
+          val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit,
+            context)
+          while (partitionIterator.hasNext) {
+            val value = partitionIterator.next()
+            if (serializeBytes == null) {
+              serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
+            }
+            recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
+              carbonLoadModel, context)
           }
-          recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
+        } else {
+          // For internal row, no need of converter and re-arrange step,
+          model.setLoadWithoutConverterWithoutReArrangeStep(true)
 
 Review comment:
   yes. done like that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801536
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##########
 @@ -834,4 +843,179 @@ object CommonUtil {
     displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+      fieldTypes: Seq[DataType],
+      outputArrayLength: Int): Array[AnyRef] = {
+    val data = new Array[AnyRef](outputArrayLength)
+    var i = 0
+    val fieldTypesLen = fieldTypes.length
+    while (i < fieldTypesLen) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case arrayType : ArrayType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType)
+          case structType : StructType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+              structType.fields.length), structType)
+          case mapType : MapType =>
+            data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType)
+          case other =>
 
 Review comment:
   it was from base code.
   
   yes. done like that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801595
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 ##########
 @@ -140,6 +141,89 @@ object CarbonScalaUtil {
     }
   }
 
+  /**
+   * Converts incoming value to String after converting data as per the data type.
+   *
+   * @param value           Input value to convert
+   * @param dataType        Datatype to convert and then convert to String
+   * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
+   * @param dateFormat      DataFormat to convert in case of DateType datatype
+   * @return converted String
+   */
+  def convertStaticPartitionToValues(
+      value: String,
+      dataType: DataType,
+      timeStampFormat: SimpleDateFormat,
+      dateFormat: SimpleDateFormat): AnyRef = {
+    val defaultValue = value != null && value.equalsIgnoreCase(hiveDefaultPartition)
+    try {
+      dataType match {
+        case TimestampType if timeStampFormat != null =>
+          val formattedString =
+            if (defaultValue) {
+              timeStampFormat.format(new Date())
+            } else {
+              timeStampFormat.format(DateTimeUtils.stringToTime(value))
+            }
+          val convertedValue =
+            DataTypeUtil
+              .getDataBasedOnDataType(formattedString,
+                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType))
+          convertedValue
+        case DateType if dateFormat != null =>
+          val formattedString =
+            if (defaultValue) {
+              dateFormat.format(new Date())
+            } else {
+              dateFormat.format(DateTimeUtils.stringToTime(value))
+            }
+          val convertedValue =
+            DataTypeUtil
+              .getDataBasedOnDataType(formattedString,
+                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType))
+          val date = generateDictionaryKey(convertedValue.asInstanceOf[Long])
+          date.asInstanceOf[AnyRef]
+        case BinaryType =>
+          // TODO: decode required ? currently it is working
+          ByteUtil.toBytes(value)
+        case _ =>
+          val convertedValue =
+            DataTypeUtil
+              .getDataBasedOnDataType(value,
+                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))
+          if (convertedValue == null) {
+            if (defaultValue) {
+              dataType match {
+                case BooleanType =>
+                  return false.asInstanceOf[AnyRef]
+                case _ =>
+                  return 0.asInstanceOf[AnyRef]
+              }
+            }
+            throw new MalformedCarbonCommandException(
+              s"Value $value with datatype $dataType on static partition is not correct")
+          }
+          convertedValue
+      }
+    } catch {
+      case e: Exception =>
+        throw new MalformedCarbonCommandException(
+          s"Value $value with datatype $dataType on static partition is not correct")
+    }
+  }
+
+  def generateDictionaryKey(timeValue: Long): Int = {
+    if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE ||
+        timeValue > DateDirectDictionaryGenerator.MAX_VALUE) {
+      if (LOGGER.isDebugEnabled) {
+        LOGGER.debug("Value for date type column is not in valid range. Value considered as null.")
+      }
+      return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+    }
+    Math.floor(timeValue.toDouble / DateDirectDictionaryGenerator.MILLIS_PER_DAY).toInt +
+    DateDirectDictionaryGenerator.cutOffDate
 
 Review comment:
   no change

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801875
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##########
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-    relation: CarbonDatasourceHadoopRelation,
-    child: LogicalPlan,
-    overwrite: Boolean,
-    partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var logicalPlan: LogicalPlan,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801973
 
 

 ##########
 File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##########
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String],
+    isOverwriteTable: Boolean,
+    var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585718233
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/274/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585739133
 
 
   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1977/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
1 ... 89101112