Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150480340 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -30,6 +30,8 @@ private static final long serialVersionUID = 6577149126264181553L; + private String dataMapName; --- End diff -- Is it unique within one table or unique across all tables? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150480875 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala --- @@ -31,18 +31,18 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { test("create and drop preaggregate table") { sql( - "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" + + "create datamap preagg1 on table maintable using 'preaggregate' as select" + --- End diff -- Is there any testcase has datamap property? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481038 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala --- @@ -0,0 +1,78 @@ +package org.apache.carbondata.spark.testsuite.datamap --- End diff -- Seems missing license file header --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481702 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + +/** + * Below command class will be used to create datamap on table + * and updating the parent table about the datamap information + * + * @param queryString + */ +case class CreateDataMapCommand( --- End diff -- Please add `Carbon` prefix, it is better to add so that it is easier to find all carbon command --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481737 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + +/** + * Below command class will be used to create datamap on table + * and updating the parent table about the datamap information + * + * @param queryString + */ +case class CreateDataMapCommand( + dataMapName: String, + tableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: Option[String]) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || + dmClassName.equalsIgnoreCase("preaggregate")) { + CreatePreAggregateTableCommand(dataMapName, + tableIdentifier, + dmClassName, + dmproperties, + queryString.get).run(sparkSession) + } else { + --- End diff -- remove empty line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481994 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -30,6 +30,8 @@ private static final long serialVersionUID = 6577149126264181553L; + private String dataMapName; --- End diff -- It is unique only to that table, not across tables --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483161 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java --- @@ -322,17 +322,22 @@ for (DataMapSchema wrapperChildSchema : wrapperChildSchemaList) { org.apache.carbondata.format.DataMapSchema thriftChildSchema = new org.apache.carbondata.format.DataMapSchema(); - org.apache.carbondata.format.RelationIdentifier relationIdentifier = - new org.apache.carbondata.format.RelationIdentifier(); - relationIdentifier - .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName()); - relationIdentifier.setTableName(wrapperChildSchema.getRelationIdentifier().getTableName()); - relationIdentifier.setTableId(wrapperChildSchema.getRelationIdentifier().getTableId()); - thriftChildSchema.setRelationIdentifire(relationIdentifier); + if (wrapperChildSchema.getRelationIdentifier() != null) { + org.apache.carbondata.format.RelationIdentifier relationIdentifier = + new org.apache.carbondata.format.RelationIdentifier(); + relationIdentifier + .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName()); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483293 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala --- @@ -0,0 +1,78 @@ +package org.apache.carbondata.spark.testsuite.datamap --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483548 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + +/** + * Below command class will be used to create datamap on table + * and updating the parent table about the datamap information + * + * @param queryString + */ +case class CreateDataMapCommand( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483631 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.datamap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + +/** + * Below command class will be used to create datamap on table + * and updating the parent table about the datamap information + * + * @param queryString + */ +case class CreateDataMapCommand( + dataMapName: String, + tableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: Option[String]) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || + dmClassName.equalsIgnoreCase("preaggregate")) { + CreatePreAggregateTableCommand(dataMapName, + tableIdentifier, + dmClassName, + dmproperties, + queryString.get).run(sparkSession) + } else { + --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150486928 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") --- End diff -- move to previous line, move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487023 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) + if (loadAvailable) { + sparkSession + .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") --- End diff -- move to previous line, move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487031 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) + if (loadAvailable) { + sparkSession + .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") } + } catch { + case e: Exception => + sparkSession. + sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""") --- End diff -- move to previous line, move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487119 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -259,7 +260,7 @@ object PreAggregateUtil { precision = precision, scale = scale, rawSchema = rawSchema), dataMapField) - } else { +} else { --- End diff -- wrong indentation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487207 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -121,6 +126,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { AlterTableCompactionCommand(altertablemodel) } + protected lazy val createDataMap: Parser[LogicalPlan] = --- End diff -- Can you add comment to describe the syntax --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490273 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490403 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) + if (loadAvailable) { + sparkSession + .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490702 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -259,7 +260,7 @@ object PreAggregateUtil { precision = precision, scale = scale, rawSchema = rawSchema), dataMapField) - } else { +} else { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490665 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) + if (loadAvailable) { + sparkSession + .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") } + } catch { + case e: Exception => + sparkSession. + sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""") --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150491412 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -121,6 +126,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { AlterTableCompactionCommand(altertablemodel) } + protected lazy val createDataMap: Parser[LogicalPlan] = --- End diff -- ok --- |
Free forum by Nabble | Edit this page |