Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062720 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -184,10 +126,86 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { } } - private def needToConvertToLowerCase(key: String): Boolean = { - val noConvertList = Array("LIST_INFO", "RANGE_INFO") - !noConvertList.exists(x => x.equalsIgnoreCase(key)); + def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] + = { + Option(ctx).map(visitPropertyKeyValues) + .getOrElse(Map.empty) } + def createCarbontable(tableHeader: CreateTableHeaderContext, + skewSpecContext: SkewSpecContext, + bucketSpecContext: BucketSpecContext, + partitionColumns: ColTypeListContext, + columns : ColTypeListContext, + tablePropertyList : TablePropertyListContext) : LogicalPlan = { + // val parser = new CarbonSpark2SqlParser + + val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader) + // TODO: implement temporary tables + if (temp) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader) + } + if (skewSpecContext != null) { + operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext) + } + if (bucketSpecContext != null) { + operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext) + } + val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList) + val partitionerFields = partitionByStructFields.map { structField => + PartitionerField(structField.name, Some(structField.dataType.toString), null) + } + val cols = Option(columns).toSeq.flatMap(visitColTypeList) + val properties = getPropertyKeyValues(tablePropertyList) + + // Ensuring whether no duplicate name is used in table definition + val colNames = cols.map(_.name) + if (colNames.length != colNames.distinct.length) { + val duplicateColumns = colNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "\"" + x + "\"" + } + operationNotAllowed(s"Duplicated column names found in table definition of $name: " + --- End diff -- Indentation is wrong --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062729 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -21,10 +21,11 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ +import org.apache.spark.sql.internal.SessionState --- End diff -- Remove unused import --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062759 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan] --- End diff -- Don't type cast. and don't duplicate method. Keep the method in common-pacakge --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062777 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val rm = universe.runtimeMirror(getClass.getClassLoader) + + /** + * Returns the field val from a object through reflection. + * @param name - name of the field being retrieved. + * @param obj - Object from which the field has to be retrieved. + * @tparam T + * @return + */ + def getField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Any = { + val im = rm.reflect(obj) + + im.symbol.typeSignature.members.find( + _.name.toString.equals(name)).map( + l => im.reflectField(l.asTerm).get.asInstanceOf[LogicalPlan] + ).getOrElse(null) + } + + def hasField[T: TypeTag: reflect.ClassTag](name: String, obj: T): Boolean = { --- End diff -- remove this method --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062806 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/CarbonClassReflectionUtils.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.runtime._ +import scala.reflect.runtime.universe._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.parser.AstBuilder +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * Reflection APIs + */ + +object CarbonClassReflectionUtils { --- End diff -- Separete as 2 classes , one is commonReflection and other is reflection util. Move as much code as possible to common reflection util. And try to make the code more generic. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062831 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -24,16 +24,18 @@ import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTemp import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.optimizer.CarbonLateDecodeRule -import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties --- End diff -- Remove unused import --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062848 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -84,8 +86,8 @@ class CarbonSessionCatalog( var toRefreshRelation = false rtnRelation match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => --- End diff -- Move this line up --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062859 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -107,15 +110,14 @@ class CarbonSessionCatalog( carbonEnv.carbonMetastore. checkSchemasModifiedTimeAndReloadTables(storePath) - val tableMeta = carbonEnv.carbonMetastore - .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, - carbonDatasourceHadoopRelation.carbonTable.getFactTableName) - if (tableMeta.isEmpty || (tableMeta.isDefined && - tableMeta.get.carbonTable.getTableLastUpdatedTime != - carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( + carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (table.isEmpty || (table.isDefined && + table.get.carbonTable.getTableLastUpdatedTime != --- End diff -- wrong indentation --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064184 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -133,7 +134,9 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) experimentalMethods.extraStrategies = - Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + Seq(new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) --- End diff -- Keep the format as old like `Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064193 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -150,9 +153,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) { new ResolveDataSource(sparkSession) :: Nil - } else { - Nil - }) + } else { Nil } + ) --- End diff -- Keep format as old --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064203 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -207,3 +213,26 @@ class CarbonOptimizer( super.execute(transFormedPlan) } } + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends + SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperqlAstBuilder(conf, parser) + + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper + .createCarbontable(ctx.createTableHeader, --- End diff -- move line up --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064235 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, CreateTableContext} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonEnv.getInstance(sparkSession).storePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + + val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + var toRefreshRelation = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case _ => + } + + if (toRefreshRelation) { + super.lookupRelation(name) + } else { + rtnRelation + } + } +} + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, + parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + /** + * Create a [[CarbonSessionCatalogBuild]]. + */ + override protected lazy val catalog: CarbonSessionCatalog = { + val catalog = new CarbonSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + sparkSession, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override protected def analyzer: Analyzer = { + new Analyzer(catalog, conf) { + + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + new CarbonIUDAnalysisRule(sparkSession) +: + CarbonPreInsertionCasts +: customResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck :: HiveOnlyCheck :: Nil + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + RelationConversions(conf, catalog) +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + } + } + + override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) + +} + + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And + // optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + } + } + super.execute(transFormedPlan) + } +} + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends + SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperqlAstBuilder(conf, parser) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper + .createCarbontable(ctx.createTableHeader, --- End diff -- move line up --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064253 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala --- @@ -85,71 +85,75 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl "('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')") checkAnswer(sql("select distinct(nodict) from restructure"), Row("abcd")) } - test("test add timestamp no dictionary column") { - sql( - "alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE" + - ".tmpstmp'= '17-01-2007')") - checkAnswer(sql("select distinct(tmpstmp) from restructure"), - Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0))) - checkExistence(sql("desc restructure"), true, "tmpstmptimestamp") - } - - test("test add timestamp direct dictionary column") { - sql( - "alter table restructure add columns(tmpstmp1 timestamp) TBLPROPERTIES ('DEFAULT.VALUE" + - ".tmpstmp1'= '17-01-3007','DICTIONARY_INCLUDE'='tmpstmp1')") - checkAnswer(sql("select distinct(tmpstmp1) from restructure"), - Row(null)) - checkExistence(sql("desc restructure"), true, "tmpstmptimestamp") - } - - test("test add timestamp column and load as dictionary") { - sql("create table table1(name string) stored by 'carbondata'") - sql("insert into table1 select 'abc'") - sql("alter table table1 add columns(tmpstmp timestamp) TBLPROPERTIES " + - "('DEFAULT.VALUE.tmpstmp'='17-01-3007','DICTIONARY_INCLUDE'= 'tmpstmp')") - sql("insert into table1 select 'name','17-01-2007'") - checkAnswer(sql("select * from table1"), - Seq(Row("abc",null), - Row("name",Timestamp.valueOf("2007-01-17 00:00:00.0")))) - } - - test("test add msr column") { - sql( - "alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" + - ".msrfield'= '123.45')") - checkExistence(sql("desc restructure"), true, "msrfielddecimal(5,2)") - val output = sql("select msrField from restructure").collect - checkAnswer(sql("select distinct(msrField) from restructure"), - Row(new BigDecimal("123.45").setScale(2, RoundingMode.HALF_UP))) - } - - test("test add all datatype supported dictionary column") { - sql( - "alter table restructure add columns(strfld string, datefld date, tptfld timestamp, " + - "shortFld smallInt, " + - "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" + - "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE" + - ".dblFld'= '12345')") - checkAnswer(sql("select distinct(dblFld) from restructure"), - Row(java.lang.Double.parseDouble("12345"))) - checkExistence(sql("desc restructure"), true, "strfldstring") - checkExistence(sql("desc restructure"), true, "dateflddate") - checkExistence(sql("desc restructure"), true, "tptfldtimestamp") - checkExistence(sql("desc restructure"), true, "shortfldsmallint") - checkExistence(sql("desc restructure"), true, "intfldint") - checkExistence(sql("desc restructure"), true, "longfldbigint") - checkExistence(sql("desc restructure"), true, "dblflddouble") - checkExistence(sql("desc restructure"), true, "dcmldecimal(5,4)") - } - - test( - "test add decimal without scale and precision, default precision and scale (10,0) should be " + - "used") - { - sql("alter table restructure add columns(dcmldefault decimal)") - checkExistence(sql("desc restructure"), true, "dcmldefaultdecimal(10,0)") - } +// test("test add timestamp no dictionary column") { --- End diff -- Don't comment any tests, just ignore. It should be fixed soon. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064274 --- Diff: pom.xml --- @@ -509,6 +501,8 @@ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> --- End diff -- why are these added? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153064291 --- Diff: pom.xml --- @@ -554,6 +553,8 @@ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> --- End diff -- why are these added? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/377/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1469 @sounakr Please change the branch to master. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076314 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + +/** + * This class contains the wrappers of all the case classes which are common + * across spark version 2.1 and 2.2 but have change in parameter list. + * Below are the overriden unapply methods in order to make it work + * across both the version of spark2.1 and spark 2.2 + */ +object CarbonExpressions { + + /** + * unapply method of Cast class. + */ + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) { --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076321 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + +/** + * This class contains the wrappers of all the case classes which are common + * across spark version 2.1 and 2.2 but have change in parameter list. + * Below are the overriden unapply methods in order to make it work + * across both the version of spark2.1 and spark 2.2 + */ +object CarbonExpressions { + + /** + * unapply method of Cast class. + */ + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) { + Some((expr.asInstanceOf[Cast].child.asInstanceOf[Attribute], expr.asInstanceOf[Cast].child + .dataType)) + } else { + None + } + } + } + + /** + * unapply method of Describe Table format. + */ + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076325 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + +/** + * This class contains the wrappers of all the case classes which are common + * across spark version 2.1 and 2.2 but have change in parameter list. + * Below are the overriden unapply methods in order to make it work + * across both the version of spark2.1 and spark 2.2 + */ +object CarbonExpressions { + + /** + * unapply method of Cast class. + */ + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) { + Some((expr.asInstanceOf[Cast].child.asInstanceOf[Attribute], expr.asInstanceOf[Cast].child + .dataType)) + } else { + None + } + } + } + + /** + * unapply method of Describe Table format. + */ + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { + val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] + if (describeTableCommand.table.isInstanceOf[TableIdentifier] && + describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec] && + describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, + describeTableCommand.partitionSpec, + describeTableCommand.isExtended) + } else { + None + } + } else { + None + } + } + } + + /** + * unapply method of SubqueryAlias. + */ + object CarbonSubqueryAlias { + def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { --- End diff -- Done --- |
Free forum by Nabble | Edit this page |