GitHub user manishgupta88 opened a pull request:
https://github.com/apache/carbondata/pull/1665 [CARBONDATA-1884] Add CTAS support to carbondata Implemented CTAS feature in carbondata. This will hep to create a carbon table from other parquet/orc tables. - [ ] Any interfaces changed? New DDL has been introduced. Syntax: **CREATE TABLE [IF NOT EXISTS] [db_name.]table_name stored by 'carbondata' AS select_statement** - [ ] Any backward compatibility impacted? No - [ ] Document update required? Yes - [ ] Testing done Yes added 12 functional test cases covering various scenarios for CTAS test - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/carbondata CTAS_implementation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1665 ---- commit 1dc3df29d92d2cddcd73f6ea0c26140bd4a70ad6 Author: manishgupta88 <[hidden email]> Date: 2017-12-13T16:10:19Z Added code for implementing CTAS ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1665 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/775/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/1665 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1665 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2006/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1665 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/780/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1665 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2321/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1665 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/785/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1665 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2018/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157355991 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param query the query whose result will be insert into the new relation + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand(query: LogicalPlan, --- End diff -- move query as the last parameter of this command --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356007 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param query the query whose result will be insert into the new relation + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand(query: LogicalPlan, + tableInfo: TableInfo, + ifNotExistsSet: Boolean = false, + tableLocation: Option[String] = None) extends MetadataCommand { --- End diff -- I think it should be AtomicRunnableCommand, since it do both data and metadata modification --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356062 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param query the query whose result will be insert into the new relation + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand(query: LogicalPlan, + tableInfo: TableInfo, + ifNotExistsSet: Boolean = false, + tableLocation: Option[String] = None) extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt: Option[String] = None + if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + LOGGER.audit(s"Request received for CTAS for $dbName.$tableName") + lazy val carbonDataSourceHadoopRelation = { --- End diff -- This can be moved down to line 61 --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356072 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param query the query whose result will be insert into the new relation + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand(query: LogicalPlan, + tableInfo: TableInfo, + ifNotExistsSet: Boolean = false, + tableLocation: Option[String] = None) extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = tableInfo.getFactTable.getTableName + var databaseOpt: Option[String] = None + if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) + } + val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + LOGGER.audit(s"Request received for CTAS for $dbName.$tableName") + lazy val carbonDataSourceHadoopRelation = { + // execute command to create carbon table + CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession) + CarbonEnv.getInstance(sparkSession).carbonMetastore + .getCarbonDataSourceHadoopRelation(sparkSession, TableIdentifier(tableName, Option(dbName))) + } + // check if table already exists + if (sparkSession.sessionState.catalog.listTables(dbName) + .exists(_.table.equalsIgnoreCase(tableName))) { + if (!ifNotExistsSet) { + LOGGER.audit( + s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + + s"Table [$tableName] already exists under database [$dbName]") + throw new TableAlreadyExistsException(dbName, tableName) + } + } else { + try { + // execute command to load data into carbon table + CarbonInsertIntoCommand(carbonDataSourceHadoopRelation, --- End diff -- move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356091 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -528,4 +529,46 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableMetadataFile = tablePath.getSchemaFilePath CarbonUtil.readSchemaFile(tableMetadataFile) } + + override def getCarbonDataSourceHadoopRelation(sparkSession: SparkSession, --- End diff -- move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356103 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -528,4 +529,46 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableMetadataFile = tablePath.getSchemaFilePath CarbonUtil.readSchemaFile(tableMetadataFile) } + + override def getCarbonDataSourceHadoopRelation(sparkSession: SparkSession, --- End diff -- please provide comment for this function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356121 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala --- @@ -144,6 +145,15 @@ trait CarbonMetaStore { def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] + + def getCarbonDataSourceHadoopRelation(sparkSession: SparkSession, --- End diff -- move parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356131 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala --- @@ -144,6 +145,15 @@ trait CarbonMetaStore { def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] + + def getCarbonDataSourceHadoopRelation(sparkSession: SparkSession, + tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation + + def getSchemaFromUnresolvedRelation(sparkSession: SparkSession, --- End diff -- add comment for interface function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356143 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -210,13 +211,40 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) } } - val fields = parser.getFields(cols ++ partitionByStructFields) + var fields = parser.getFields(cols ++ partitionByStructFields) val options = new CarbonOption(properties) // validate tblProperties val bucketFields = parser.getBucketFields(tableProperties, fields, options) validateStreamingProperty(options) + // validate for create table as select --- End diff -- move this logic to a validateSelectQueryForCTAS function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356168 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -280,25 +280,26 @@ class CarbonOptimizer( } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { helper.createCarbonTable(ctx.createTableHeader, - ctx.skewSpec, - ctx.bucketSpec, - ctx.partitionColumns, - ctx.columns, - ctx.tablePropertyList, - ctx.locationSpec(), - Option(ctx.STRING()).map(string), - ctx.AS) + ctx.skewSpec, --- End diff -- Sine you modify this function, to make it more readable, please add parameter name also, like ``` foo( paramA = a, paramB = b ) --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356181 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -259,25 +259,26 @@ object CarbonOptimizerUtil { } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { helper.createCarbonTable(ctx.createTableHeader, --- End diff -- Sine you modify this function, to make it more readable, please add parameter name also, like ``` foo( paramA = a, paramB = b ...) ``` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1665#discussion_r157356189 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala --- @@ -144,6 +145,15 @@ trait CarbonMetaStore { def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] + + def getCarbonDataSourceHadoopRelation(sparkSession: SparkSession, + tableIdentifier: TableIdentifier): CarbonDatasourceHadoopRelation + + def getSchemaFromUnresolvedRelation(sparkSession: SparkSession, --- End diff -- This function is simple logic only, why not do this logic directly in caller? --- |
Free forum by Nabble | Edit this page |