Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r162910788 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -649,6 +659,7 @@ case class CarbonLoadDataCommand( CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) + CarbonSession.threadUnset("partition.opeartioncontext") --- End diff -- typo mistake `operationcontext` , better take it to CarbonCommonCOnstants --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1837 @kumarvishal09 Please remove WIP and add jira id if you are done with this PR --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1837 @ravipesala I am working on auto merger after that i will update the PR --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1837 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1865/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1837 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3096/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1837 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3078/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1837 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3097/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1837 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1867/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1837 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3079/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1837 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3098/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163751453 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala --- @@ -57,30 +58,16 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + carbonLoadModel.getDatabaseName + "." + carbonLoadModel.getTableName, "false") - val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala - .map(_.getColumnName).mkString(",") - // Creating a new query string to insert data into pre-aggregate table from that same table. - // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 - // select * from preaggtable1 - // The following code will generate the select query with a load UDF that will be used to - // apply DataLoadingRules - val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser() - // adding the aggregation load UDF - .addPreAggLoadFunction( - // creating the select query on the bases on table schema - PreAggregateUtil.createChildSelectQuery( - carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad") + CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession) + val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction") + .asInstanceOf[CarbonLoadDataCommand] try { - CarbonLoadDataCommand( - Some(carbonTable.getDatabaseName), - carbonTable.getTableName, - null, - Nil, - Map("fileheader" -> headers), - isOverwriteTable = false, - dataFrame = Some(childDataFrame), - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true", - "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession) + val newInternalOptions = loadCommand.internalOptions ++ + Map("mergedSegmentName" -> mergedLoadName) + loadCommand.internalOptions = newInternalOptions + loadCommand.dataFrame = Some(PreAggregateUtil --- End diff -- Please correct the format ``` loadCommand.dataFrame = Some(PreAggregateUtil.getDataFrame(sqlContext.sparkSession, loadCommand.logicalPlan.get)) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163753157 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment */ case class CarbonAlterTableCompactionCommand( alterTableModel: AlterTableModel, - tableInfoOp: Option[TableInfo] = None) - extends DataCommand { + tableInfoOp: Option[TableInfo] = None, + val operationContext: OperationContext = new OperationContext ) + extends AtomicRunnableCommand { --- End diff -- Enter new line here. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163753419 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment */ case class CarbonAlterTableCompactionCommand( alterTableModel: AlterTableModel, - tableInfoOp: Option[TableInfo] = None) - extends DataCommand { + tableInfoOp: Option[TableInfo] = None, + val operationContext: OperationContext = new OperationContext ) + extends AtomicRunnableCommand { + var table: CarbonTable = _ - override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = alterTableModel.tableName.toLowerCase - val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) - - val table = if (tableInfoOp.isDefined) { - val tableInfo = tableInfoOp.get - // To DO: CarbonEnv.updateStorePath - CarbonTable.buildFromTableInfo(tableInfo) + val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + table = if (tableInfoOp.isDefined) { + CarbonTable.buildFromTableInfo(tableInfoOp.get) } else { - val relation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] if (relation == null) { - throw new NoSuchTableException(databaseName, tableName) + throw new NoSuchTableException(dbName, tableName) } if (null == relation.carbonTable) { - LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName") - throw new NoSuchTableException(databaseName, tableName) + LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") + LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") --- End diff -- remove the duplicate audit log --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163757369 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala --- @@ -53,42 +54,50 @@ import org.apache.carbondata.streaming.segment.StreamSegment */ case class CarbonAlterTableCompactionCommand( alterTableModel: AlterTableModel, - tableInfoOp: Option[TableInfo] = None) - extends DataCommand { + tableInfoOp: Option[TableInfo] = None, + val operationContext: OperationContext = new OperationContext ) + extends AtomicRunnableCommand { + var table: CarbonTable = _ - override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = alterTableModel.tableName.toLowerCase - val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) - - val table = if (tableInfoOp.isDefined) { - val tableInfo = tableInfoOp.get - // To DO: CarbonEnv.updateStorePath - CarbonTable.buildFromTableInfo(tableInfo) + val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + table = if (tableInfoOp.isDefined) { + CarbonTable.buildFromTableInfo(tableInfoOp.get) } else { - val relation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] if (relation == null) { - throw new NoSuchTableException(databaseName, tableName) + throw new NoSuchTableException(dbName, tableName) } if (null == relation.carbonTable) { - LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName") - throw new NoSuchTableException(databaseName, tableName) + LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") + LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") + throw new NoSuchTableException(dbName, tableName) } relation.carbonTable } + if (CarbonUtil.hasAggregationDataMap(table) || + (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) { --- End diff -- Irrespective of this condition I guess we always need to fire the event. If the listener does not want to execute then move this if condition to the listner instead of here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163757600 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -87,12 +84,41 @@ case class CarbonLoadDataCommand( options: scala.collection.immutable.Map[String, String], isOverwriteTable: Boolean, var inputSqlString: String = null, - dataFrame: Option[DataFrame] = None, + var dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, var tableInfoOp: Option[TableInfo] = None, - internalOptions: Map[String, String] = Map.empty, - partition: Map[String, Option[String]] = Map.empty) extends DataCommand { + var internalOptions: Map[String, String] = Map.empty, + partition: Map[String, Option[String]] = Map.empty, + logicalPlan: Option[LogicalPlan] = None, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + var table: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + table = if (tableInfoOp.isDefined) { + CarbonTable.buildFromTableInfo(tableInfoOp.get) + } else { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { + throw new NoSuchTableException(dbName, tableName) + } + if (null == relation.carbonTable) { + LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") + LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") + throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable + } + operationContext.setProperty("isOverwrite", isOverwriteTable) + if (CarbonUtil.hasAggregationDataMap(table)) { + val loadMetadataEvent = new LoadMetadataEvent(table, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) --- End diff -- Always fire the event here, handle if condition inside the registered listner --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163757666 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala --- @@ -649,6 +659,7 @@ case class CarbonLoadDataCommand( CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) + CarbonSession.threadUnset("partition.opeartioncontext") --- End diff -- Please take this to CarbonCommonConstants --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163757962 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -20,18 +20,141 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.CarbonEnv -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object CompactionProcessMetaListener extends OperationEventListener { --- End diff -- Add the comment for usage --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163758021 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -20,18 +20,141 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.CarbonEnv -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object CompactionProcessMetaListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get + val tableEvent = event.asInstanceOf[LoadMetadataEvent] + val table = tableEvent.getCarbonTable + if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) { + val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala + .filter(_.isInstanceOf[AggregationDataMapSchema]) + .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]] + for (dataMapSchema: AggregationDataMapSchema <- aggregationDataMapList) { + val childTableName = dataMapSchema.getRelationIdentifier.getTableName + val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction( + // creating the select query on the bases on table schema + PreAggregateUtil.createChildSelectQuery( + dataMapSchema.getChildSchema, table.getDatabaseName))).drop("preAggLoad") + val loadCommand = PreAggregateUtil.createLoadCommandForChild( + dataMapSchema.getChildSchema.getListOfColumns, + TableIdentifier(childTableName, Some(childDatabaseName)), + childDataFrame, + false, + sparkSession) + operationContext + .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand) + } + } else { + val childTableName = table.getTableName + val childDatabaseName = table.getDatabaseName + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction( + // creating the select query on the bases on table schema + PreAggregateUtil.createChildSelectQuery( + table.getTableInfo.getFactTable, table.getDatabaseName))).drop("preAggLoad") + val loadCommand = PreAggregateUtil.createLoadCommandForChild( + table.getTableInfo.getFactTable.getListOfColumns, + TableIdentifier(childTableName, Some(childDatabaseName)), + childDataFrame, + false, + sparkSession) + operationContext.setProperty(table.getTableName + "_Compaction", loadCommand) + } + } +} +object LoadProcessMetaListener extends OperationEventListener { --- End diff -- Add comment --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163760282 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -900,4 +876,33 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + /** + * This method will start load process on the data map + */ + def createLoadCommandForChild( + columns: java.util.List[ColumnSchema], + dataMapIdentifier: TableIdentifier, + dataFrame: DataFrame, + isOverwrite: Boolean, + sparkSession: SparkSession): CarbonLoadDataCommand = { + val headers = columns.asScala.filter { column => + !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) + }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") + val loadCommand = CarbonLoadDataCommand(dataMapIdentifier.database, + dataMapIdentifier.table, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = isOverwrite, + dataFrame = None, + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"), + logicalPlan = Some(dataFrame.queryExecution.logical)) + loadCommand.processMetadata(sparkSession) --- End diff -- Better don't process here, just return the command and call `processMetadata(sparkSession)` in caller for more meaningful as method name says `createLoadCommandForChild` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163760694 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -42,15 +42,11 @@ case class CarbonCreateTableAsSelectCommand( ifNotExistsSet: Boolean = false, tableLocation: Option[String] = None) extends AtomicRunnableCommand { - /** - * variable to be used for insert into command for checking whether the - * table is created newly or already existed - */ - var isTableCreated: Boolean = false - + var loadCommand: CarbonInsertIntoCommand = _ --- End diff -- enter new line --- |
Free forum by Nabble | Edit this page |