[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

classic Classic list List threaded Threaded
57 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r162910788
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -649,6 +659,7 @@ case class CarbonLoadDataCommand(
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
    +      CarbonSession.threadUnset("partition.opeartioncontext")
    --- End diff --
   
    typo mistake `operationcontext` , better take it to CarbonCommonCOnstants


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    @kumarvishal09 Please remove WIP and add jira id if you are done with this PR


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    @ravipesala I am working on auto merger after that i will update the PR


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1865/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3096/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3078/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3097/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1867/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3079/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1837: [WIP] Refactored code segregated process meta and pr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1837
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3098/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163751453
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
    @@ -57,30 +58,16 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
             CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
             carbonLoadModel.getDatabaseName + "." +
             carbonLoadModel.getTableName, "false")
    -      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    -        .map(_.getColumnName).mkString(",")
    -      // Creating a new query string to insert data into pre-aggregate table from that same table.
    -      // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
    -      // select * from preaggtable1
    -      // The following code will generate the select query with a load UDF that will be used to
    -      // apply DataLoadingRules
    -      val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser()
    -        // adding the aggregation load UDF
    -        .addPreAggLoadFunction(
    -        // creating the select query on the bases on table schema
    -        PreAggregateUtil.createChildSelectQuery(
    -          carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad")
    +      CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
    +      val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction")
    +        .asInstanceOf[CarbonLoadDataCommand]
           try {
    -        CarbonLoadDataCommand(
    -          Some(carbonTable.getDatabaseName),
    -          carbonTable.getTableName,
    -          null,
    -          Nil,
    -          Map("fileheader" -> headers),
    -          isOverwriteTable = false,
    -          dataFrame = Some(childDataFrame),
    -          internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
    -            "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession)
    +        val newInternalOptions = loadCommand.internalOptions ++
    +                                 Map("mergedSegmentName" -> mergedLoadName)
    +        loadCommand.internalOptions = newInternalOptions
    +        loadCommand.dataFrame = Some(PreAggregateUtil
    --- End diff --
   
    Please correct the format
    ```
    loadCommand.dataFrame =
              Some(PreAggregateUtil.getDataFrame(sqlContext.sparkSession, loadCommand.logicalPlan.get))
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163753157
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment
      */
     case class CarbonAlterTableCompactionCommand(
         alterTableModel: AlterTableModel,
    -    tableInfoOp: Option[TableInfo] = None)
    -  extends DataCommand {
    +    tableInfoOp: Option[TableInfo] = None,
    +    val operationContext: OperationContext = new OperationContext )
    +  extends AtomicRunnableCommand {
    --- End diff --
   
    Enter new line here.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163753419
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment
      */
     case class CarbonAlterTableCompactionCommand(
         alterTableModel: AlterTableModel,
    -    tableInfoOp: Option[TableInfo] = None)
    -  extends DataCommand {
    +    tableInfoOp: Option[TableInfo] = None,
    +    val operationContext: OperationContext = new OperationContext )
    +  extends AtomicRunnableCommand {
    +  var table: CarbonTable = _
     
    -  override def processData(sparkSession: SparkSession): Seq[Row] = {
    -    val LOGGER: LogService =
    -      LogServiceFactory.getLogService(this.getClass.getName)
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val tableName = alterTableModel.tableName.toLowerCase
    -    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
    -
    -    val table = if (tableInfoOp.isDefined) {
    -      val tableInfo = tableInfoOp.get
    -      //   To DO: CarbonEnv.updateStorePath
    -      CarbonTable.buildFromTableInfo(tableInfo)
    +    val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
    +    table = if (tableInfoOp.isDefined) {
    +      CarbonTable.buildFromTableInfo(tableInfoOp.get)
         } else {
    -      val relation =
    -        CarbonEnv.getInstance(sparkSession).carbonMetastore
    -          .lookupRelation(Option(databaseName), tableName)(sparkSession)
    -          .asInstanceOf[CarbonRelation]
    +      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
           if (relation == null) {
    -        throw new NoSuchTableException(databaseName, tableName)
    +        throw new NoSuchTableException(dbName, tableName)
           }
           if (null == relation.carbonTable) {
    -        LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
    -        throw new NoSuchTableException(databaseName, tableName)
    +        LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
    +        LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
    --- End diff --
   
    remove the duplicate audit log


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163757369
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---
    @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment
      */
     case class CarbonAlterTableCompactionCommand(
         alterTableModel: AlterTableModel,
    -    tableInfoOp: Option[TableInfo] = None)
    -  extends DataCommand {
    +    tableInfoOp: Option[TableInfo] = None,
    +    val operationContext: OperationContext = new OperationContext )
    +  extends AtomicRunnableCommand {
    +  var table: CarbonTable = _
     
    -  override def processData(sparkSession: SparkSession): Seq[Row] = {
    -    val LOGGER: LogService =
    -      LogServiceFactory.getLogService(this.getClass.getName)
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val tableName = alterTableModel.tableName.toLowerCase
    -    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
    -
    -    val table = if (tableInfoOp.isDefined) {
    -      val tableInfo = tableInfoOp.get
    -      //   To DO: CarbonEnv.updateStorePath
    -      CarbonTable.buildFromTableInfo(tableInfo)
    +    val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
    +    table = if (tableInfoOp.isDefined) {
    +      CarbonTable.buildFromTableInfo(tableInfoOp.get)
         } else {
    -      val relation =
    -        CarbonEnv.getInstance(sparkSession).carbonMetastore
    -          .lookupRelation(Option(databaseName), tableName)(sparkSession)
    -          .asInstanceOf[CarbonRelation]
    +      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
           if (relation == null) {
    -        throw new NoSuchTableException(databaseName, tableName)
    +        throw new NoSuchTableException(dbName, tableName)
           }
           if (null == relation.carbonTable) {
    -        LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
    -        throw new NoSuchTableException(databaseName, tableName)
    +        LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
    +        LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
    +        throw new NoSuchTableException(dbName, tableName)
           }
           relation.carbonTable
         }
    +    if (CarbonUtil.hasAggregationDataMap(table) ||
    +        (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
    --- End diff --
   
    Irrespective of this condition I guess we always need to fire the event. If the listener does not want to execute then move this if condition to the listner instead of here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163757600
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -87,12 +84,41 @@ case class CarbonLoadDataCommand(
         options: scala.collection.immutable.Map[String, String],
         isOverwriteTable: Boolean,
         var inputSqlString: String = null,
    -    dataFrame: Option[DataFrame] = None,
    +    var dataFrame: Option[DataFrame] = None,
         updateModel: Option[UpdateTableModel] = None,
         var tableInfoOp: Option[TableInfo] = None,
    -    internalOptions: Map[String, String] = Map.empty,
    -    partition: Map[String, Option[String]] = Map.empty) extends DataCommand {
    +    var internalOptions: Map[String, String] = Map.empty,
    +    partition: Map[String, Option[String]] = Map.empty,
    +    logicalPlan: Option[LogicalPlan] = None,
    +    var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand {
     
    +  var table: CarbonTable = _
    +
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
    +    table = if (tableInfoOp.isDefined) {
    +        CarbonTable.buildFromTableInfo(tableInfoOp.get)
    +      } else {
    +        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
    +        if (relation == null) {
    +          throw new NoSuchTableException(dbName, tableName)
    +        }
    +        if (null == relation.carbonTable) {
    +          LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
    +          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
    +          throw new NoSuchTableException(dbName, tableName)
    +        }
    +        relation.carbonTable
    +      }
    +    operationContext.setProperty("isOverwrite", isOverwriteTable)
    +    if (CarbonUtil.hasAggregationDataMap(table)) {
    +        val loadMetadataEvent = new LoadMetadataEvent(table, false)
    +        OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
    --- End diff --
   
    Always fire the event here, handle if condition inside the registered  listner


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163757666
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -649,6 +659,7 @@ case class CarbonLoadDataCommand(
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
           CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
    +      CarbonSession.threadUnset("partition.opeartioncontext")
    --- End diff --
   
    Please take this  to CarbonCommonConstants


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163757962
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -20,18 +20,141 @@ package org.apache.spark.sql.execution.command.preaaggregate
     import scala.collection.JavaConverters._
     import scala.collection.mutable
     
    -import org.apache.spark.sql.CarbonEnv
    -import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.{SparkSession}
     import org.apache.spark.sql.catalyst.TableIdentifier
     import org.apache.spark.sql.execution.command.AlterTableModel
    -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
    +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
     
    -import org.apache.carbondata.core.constants.CarbonCommonConstants
     import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
     import org.apache.carbondata.core.util.CarbonUtil
     import org.apache.carbondata.events._
    -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
    +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object CompactionProcessMetaListener extends OperationEventListener {
    --- End diff --
   
    Add the comment for usage


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163758021
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -20,18 +20,141 @@ package org.apache.spark.sql.execution.command.preaaggregate
     import scala.collection.JavaConverters._
     import scala.collection.mutable
     
    -import org.apache.spark.sql.CarbonEnv
    -import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.{SparkSession}
     import org.apache.spark.sql.catalyst.TableIdentifier
     import org.apache.spark.sql.execution.command.AlterTableModel
    -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
    +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
     
    -import org.apache.carbondata.core.constants.CarbonCommonConstants
     import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
     import org.apache.carbondata.core.util.CarbonUtil
     import org.apache.carbondata.events._
    -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
    +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object CompactionProcessMetaListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
    +    val tableEvent = event.asInstanceOf[LoadMetadataEvent]
    +    val table = tableEvent.getCarbonTable
    +    if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) {
    +      val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
    +        .filter(_.isInstanceOf[AggregationDataMapSchema])
    +        .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
    +      for (dataMapSchema: AggregationDataMapSchema <- aggregationDataMapList) {
    +        val childTableName = dataMapSchema.getRelationIdentifier.getTableName
    +        val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
    +        // Creating a new query string to insert data into pre-aggregate table from that same table.
    +        // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
    +        // select * from preaggtable1
    +        // The following code will generate the select query with a load UDF that will be used to
    +        // apply DataLoadingRules
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +          // adding the aggregation load UDF
    +          .addPreAggLoadFunction(
    +          // creating the select query on the bases on table schema
    +        PreAggregateUtil.createChildSelectQuery(
    +          dataMapSchema.getChildSchema, table.getDatabaseName))).drop("preAggLoad")
    +        val loadCommand = PreAggregateUtil.createLoadCommandForChild(
    +          dataMapSchema.getChildSchema.getListOfColumns,
    +          TableIdentifier(childTableName, Some(childDatabaseName)),
    +          childDataFrame,
    +          false,
    +          sparkSession)
    +        operationContext
    +          .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand)
    +      }
    +    } else {
    +      val childTableName = table.getTableName
    +      val childDatabaseName = table.getDatabaseName
    +      // Creating a new query string to insert data into pre-aggregate table from that same table.
    +      // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
    +      // select * from preaggtable1
    +      // The following code will generate the select query with a load UDF that will be used to
    +      // apply DataLoadingRules
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        // adding the aggregation load UDF
    +        .addPreAggLoadFunction(
    +        // creating the select query on the bases on table schema
    +        PreAggregateUtil.createChildSelectQuery(
    +          table.getTableInfo.getFactTable, table.getDatabaseName))).drop("preAggLoad")
    +      val loadCommand = PreAggregateUtil.createLoadCommandForChild(
    +        table.getTableInfo.getFactTable.getListOfColumns,
    +        TableIdentifier(childTableName, Some(childDatabaseName)),
    +        childDataFrame,
    +        false,
    +        sparkSession)
    +      operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
    +    }
    +  }
    +}
    +object LoadProcessMetaListener extends OperationEventListener {
    --- End diff --
   
    Add comment


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163760282
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -900,4 +876,33 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  /**
    +   * This method will start load process on the data map
    +   */
    +  def createLoadCommandForChild(
    +      columns: java.util.List[ColumnSchema],
    +      dataMapIdentifier: TableIdentifier,
    +      dataFrame: DataFrame,
    +      isOverwrite: Boolean,
    +      sparkSession: SparkSession): CarbonLoadDataCommand = {
    +    val headers = columns.asScala.filter { column =>
    +      !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
    +    }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
    +    val loadCommand = CarbonLoadDataCommand(dataMapIdentifier.database,
    +      dataMapIdentifier.table,
    +      null,
    +      Nil,
    +      Map("fileheader" -> headers),
    +      isOverwriteTable = isOverwrite,
    +      dataFrame = None,
    +      internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
    +      logicalPlan = Some(dataFrame.queryExecution.logical))
    +    loadCommand.processMetadata(sparkSession)
    --- End diff --
   
    Better don't process here, just return the command and call `processMetadata(sparkSession)` in caller for more meaningful as method name says `createLoadCommandForChild`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1837: [WIP] Refactored code segregated process meta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163760694
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala ---
    @@ -42,15 +42,11 @@ case class CarbonCreateTableAsSelectCommand(
         ifNotExistsSet: Boolean = false,
         tableLocation: Option[String] = None) extends AtomicRunnableCommand {
     
    -  /**
    -   * variable to be used for insert into command for checking whether the
    -   * table is created newly or already existed
    -   */
    -  var isTableCreated: Boolean = false
    -
    +  var loadCommand: CarbonInsertIntoCommand = _
    --- End diff --
   
    enter new line


---
123