Github user Indhumathi27 commented on the issue:
https://github.com/apache/carbondata/pull/2225 Retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185398816 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala --- @@ -69,4 +73,11 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) } + test("CARBONDATA-2396 Support Create Table As Select with'using carbondata'") { + sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata3 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source")) + sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3") + checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source")) + } --- End diff -- Add a test case for CTAS with create table if not exists as select --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2225 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4667/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185481616 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -224,15 +224,32 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => val updatedCatalog = - CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession) + CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, None) --- End diff -- take the default value as None in method updateCatalogTableWithCarbonSchema instead of passing from here --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185481687 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -224,15 +224,32 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => val updatedCatalog = - CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession) + CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, None) val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil + case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER + && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") + || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, Option(query)) + val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query) + ExecutedCommandExec(cmd) :: Nil + case [hidden email](tableDesc, mode, query) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER + && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") + || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, query) + val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query.get) + ExecutedCommandExec(cmd) :: Nil case CreateDataSourceTableCommand(table, ignoreIfExists) if table.provider.get != DDLUtils.HIVE_PROVIDER && (table.provider.get.equals("org.apache.spark.sql.CarbonSource") || table.provider.get.equalsIgnoreCase("carbondata")) => - val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession) + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(table, sparkSession, None) --- End diff -- same comment as above --- |
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185483770 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -224,15 +224,32 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => val updatedCatalog = - CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession) + CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, None) --- End diff -- okay --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5583/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/2225 USING org.apache.spark.sql.CarbonSource also has the CTAS problem, please add test case for it and change the decribe of this PR --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185489456 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala --- @@ -0,0 +1,170 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{ + CatalogRelation, CatalogTable, CatalogTableType, + SimpleCatalogRelation +} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{ + AlterTableRecoverPartitionsCommand, DDLUtils, + RunnableCommand +} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.types.StructType + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + +case class CreateCarbonSourceTableAsSelectCommand( + table: CatalogTable, + mode: SaveMode, + query: LogicalPlan) + extends RunnableCommand { + + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + assert(table.schema.isEmpty) + + val provider = table.provider.get + val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + + var createMetastoreTable = false + var existingSchema = Option.empty[StructType] + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { + // Check if we need to throw an exception or just return. + mode match { + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to " + + s"SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode" + + s".Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop " + + s"$tableName first.") + case SaveMode.Ignore => + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty[Row] + case SaveMode.Append => + // Check if the specified data source match the data source of the existing table. + val existingProvider = DataSource.lookupDataSource(provider) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + // check if the file formats match + l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider => + throw new AnalysisException( + s"The file format of the existing table $tableName is " + + s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " + + s"format `$provider`") + case _ => + } + if (query.schema.size != l.schema.size) { + throw new AnalysisException( + s"The column number of the existing schema[${ l.schema }] " + + s"doesn't match the data schema[${ query.schema }]'s") + } + existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = Some(s.metadata.schema) + case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) => + throw new AnalysisException("Saving data in the Hive serde table " + + s"${ + c.catalogTable --- End diff -- please keep {} in one line --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185492584 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala --- @@ -69,4 +73,44 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) } + test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata'") { + sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata3 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source")) + sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3") + checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source")) + } + + test("CARBONDATA-2396 Support Create Table As Select [IF NOT EXISTS] with 'using carbondata'") { + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata5 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source")) + sql( + "CREATE TABLE IF NOT EXISTS src_carbondata6 USING carbondata as select * from " + + "src_carbondata5") + checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source")) + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + } + + test( + "CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Table properties") --- End diff -- please keep it in one line --- |
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185494171 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala --- @@ -0,0 +1,170 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{ + CatalogRelation, CatalogTable, CatalogTableType, + SimpleCatalogRelation +} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{ + AlterTableRecoverPartitionsCommand, DDLUtils, + RunnableCommand +} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.types.StructType + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + +case class CreateCarbonSourceTableAsSelectCommand( + table: CatalogTable, + mode: SaveMode, + query: LogicalPlan) + extends RunnableCommand { + + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + assert(table.schema.isEmpty) + + val provider = table.provider.get + val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + + var createMetastoreTable = false + var existingSchema = Option.empty[StructType] + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { + // Check if we need to throw an exception or just return. + mode match { + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to " + + s"SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode" + + s".Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop " + + s"$tableName first.") + case SaveMode.Ignore => + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty[Row] + case SaveMode.Append => + // Check if the specified data source match the data source of the existing table. + val existingProvider = DataSource.lookupDataSource(provider) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + // check if the file formats match + l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider => + throw new AnalysisException( + s"The file format of the existing table $tableName is " + + s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " + + s"format `$provider`") + case _ => + } + if (query.schema.size != l.schema.size) { + throw new AnalysisException( + s"The column number of the existing schema[${ l.schema }] " + + s"doesn't match the data schema[${ query.schema }]'s") + } + existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = Some(s.metadata.schema) + case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) => + throw new AnalysisException("Saving data in the Hive serde table " + + s"${ + c.catalogTable --- End diff -- okay --- |
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2225#discussion_r185501934 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala --- @@ -69,4 +73,44 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) } + test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata'") { + sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata3 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source")) + sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3") + checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source")) + } + + test("CARBONDATA-2396 Support Create Table As Select [IF NOT EXISTS] with 'using carbondata'") { + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata5 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source")) + sql( + "CREATE TABLE IF NOT EXISTS src_carbondata6 USING carbondata as select * from " + + "src_carbondata5") + checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source")) + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + } + + test( + "CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Table properties") --- End diff -- @xubo245 "USING org.apache.spark.sql.CarbonSource" has been only changed to "USING carbondata" --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4422/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2225 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4676/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4430/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5591/ --- |
In reply to this post by qiuchenjian-2
Github user Indhumathi27 commented on the issue:
https://github.com/apache/carbondata/pull/2225 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5593/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2225 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4432/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2225 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4684/ --- |
Free forum by Nabble | Edit this page |