Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094548 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + val destinationTable = CarbonClassReflectionUtils + .getUnresolvedRelation(table.tableIdentifier, alias) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094607 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + val destinationTable = CarbonClassReflectionUtils + .getUnresolvedRelation(table.tableIdentifier, alias) + ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) } - def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = { - val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession), - table.tableIdentifier.table) + + def processDeleteRecordsQuery(selectStmt: String, + alias: Option[String], + table: UnresolvedRelation): LogicalPlan = { + val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession), + table.tableIdentifier.table) var addedTupleId = false val parsePlan = parser.parsePlan(selectStmt) + val selectPlan = parsePlan transform { case relation: UnresolvedRelation if table.tableIdentifier == relation.tableIdentifier && !addedTupleId => addedTupleId = true val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) - val alias = table.alias match { - case Some(alias) => Some(table.alias.toSeq) + + val localalias = alias match { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094754 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -201,8 +237,10 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica override def apply(logicalplan: LogicalPlan): LogicalPlan = { logicalplan transform { - case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where) - case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table) + case UpdateTable(t, cols, sel, alias, where) => processUpdateQuery(t, cols, sel, alias, where) + case DeleteRecords(statement, alias, table) => processDeleteRecordsQuery(statement, + alias, + table) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094856 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095079 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta.asInstanceOf[CatalogTable] --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095100 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta.asInstanceOf[CatalogTable] + } + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { val database = tableIdentifier.database.getOrElse( sparkSession.catalog.currentDatabase) val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDatasourceHadoopRelation.carbonRelation case LogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation + case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && + getField("tableMeta", c) + .asInstanceOf[CatalogTable].provider + .isDefined && + getField("tableMeta", c) + .asInstanceOf[CatalogTable].provider.get + .equals("org.apache.spark.sql.CarbonSource") => --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095114 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala --- @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( - s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + + sparkSession.sessionState --- End diff -- Removed --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095139 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala --- @@ -172,7 +172,13 @@ case class CarbonRelation( } // TODO: Use data from the footers. - override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) + // TODO For 2.1 + // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) + // Todo for 2.2 + // override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes = + // this.sizeInBytes) + + // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) --- End diff -- Will resolve it as soon as possible. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095165 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala --- @@ -32,76 +32,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { /** * To initialize dynamic param defaults along with usage docs */ - def addDefaultCarbonParams(): Unit = { - val ENABLE_UNSAFE_SORT = - SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) - .doc("To enable/ disable unsafe sort.") - .booleanConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - val CARBON_CUSTOM_BLOCK_DISTRIBUTION = - SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To enable/ disable carbon custom block distribution.") - .booleanConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) - val BAD_RECORDS_LOGGER_ENABLE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) - .doc("To enable/ disable carbon bad record logger.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants - .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - val BAD_RECORDS_ACTION = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) - .doc("To configure the bad records action.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - val IS_EMPTY_DATA_BAD_RECORD = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) - .doc("Property to decide weather empty data to be considered bad/ good record.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT - .toBoolean) - val SORT_SCOPE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) - .doc("Property to specify sort scope.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - val BATCH_SORT_SIZE_INMB = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) - .doc("Property to specify batch sort size in MB.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - val SINGLE_PASS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) - .doc("Property to enable/disable single_pass.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - val GLOBAL_SORT_PARTITIONS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) - .doc("Property to configure the global sort partitions.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - val DATEFORMAT = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) - .doc("Property to configure data format for date type columns.") - .stringConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - } --- End diff -- These are not being referred any more --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095183 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala --- @@ -38,7 +38,6 @@ import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory import org.apache.carbondata.spark.CarbonAliasDecoderRelation - --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095637 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala --- @@ -34,7 +34,7 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE Desc2(Dec2Col1 BigInt, Dec2Col2 String, Dec2Col3 Bigint, Dec2Col4 Decimal) stored by 'carbondata'") } - test("test describe table") { + ignore("test describe table") { --- End diff -- Why ignore this? --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095841 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -177,7 +213,24 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val table: Parser[UnresolvedRelation] = { rep1sep(attributeName, ".") ~ opt(ident) ^^ { - case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) + case tableIdent ~ alias => UnresolvedRelation(tableIdent) + } + } + + protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String], + TableIdentifier)] = { + rep1sep(attributeName, ".") ~ opt(ident) ^^ { + case tableIdent ~ alias => + + val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent) + val localAlias: Option[String] = alias --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095195 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -17,19 +17,23 @@ package org.apache.spark.sql.parser +import java.lang.reflect.InvocationTargetException --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095958 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala --- @@ -85,71 +85,75 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl "('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')") checkAnswer(sql("select distinct(nodict) from restructure"), Row("abcd")) } - test("test add timestamp no dictionary column") { - sql( - "alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE" + - ".tmpstmp'= '17-01-2007')") - checkAnswer(sql("select distinct(tmpstmp) from restructure"), - Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0))) - checkExistence(sql("desc restructure"), true, "tmpstmptimestamp") - } - - test("test add timestamp direct dictionary column") { - sql( - "alter table restructure add columns(tmpstmp1 timestamp) TBLPROPERTIES ('DEFAULT.VALUE" + - ".tmpstmp1'= '17-01-3007','DICTIONARY_INCLUDE'='tmpstmp1')") - checkAnswer(sql("select distinct(tmpstmp1) from restructure"), - Row(null)) - checkExistence(sql("desc restructure"), true, "tmpstmptimestamp") - } - - test("test add timestamp column and load as dictionary") { - sql("create table table1(name string) stored by 'carbondata'") - sql("insert into table1 select 'abc'") - sql("alter table table1 add columns(tmpstmp timestamp) TBLPROPERTIES " + - "('DEFAULT.VALUE.tmpstmp'='17-01-3007','DICTIONARY_INCLUDE'= 'tmpstmp')") - sql("insert into table1 select 'name','17-01-2007'") - checkAnswer(sql("select * from table1"), - Seq(Row("abc",null), - Row("name",Timestamp.valueOf("2007-01-17 00:00:00.0")))) - } - - test("test add msr column") { - sql( - "alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" + - ".msrfield'= '123.45')") - checkExistence(sql("desc restructure"), true, "msrfielddecimal(5,2)") - val output = sql("select msrField from restructure").collect - checkAnswer(sql("select distinct(msrField) from restructure"), - Row(new BigDecimal("123.45").setScale(2, RoundingMode.HALF_UP))) - } - - test("test add all datatype supported dictionary column") { - sql( - "alter table restructure add columns(strfld string, datefld date, tptfld timestamp, " + - "shortFld smallInt, " + - "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" + - "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE" + - ".dblFld'= '12345')") - checkAnswer(sql("select distinct(dblFld) from restructure"), - Row(java.lang.Double.parseDouble("12345"))) - checkExistence(sql("desc restructure"), true, "strfldstring") - checkExistence(sql("desc restructure"), true, "dateflddate") - checkExistence(sql("desc restructure"), true, "tptfldtimestamp") - checkExistence(sql("desc restructure"), true, "shortfldsmallint") - checkExistence(sql("desc restructure"), true, "intfldint") - checkExistence(sql("desc restructure"), true, "longfldbigint") - checkExistence(sql("desc restructure"), true, "dblflddouble") - checkExistence(sql("desc restructure"), true, "dcmldecimal(5,4)") - } - - test( - "test add decimal without scale and precision, default precision and scale (10,0) should be " + - "used") - { - sql("alter table restructure add columns(dcmldefault decimal)") - checkExistence(sql("desc restructure"), true, "dcmldefaultdecimal(10,0)") - } +// test("test add timestamp no dictionary column") { --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095978 --- Diff: pom.xml --- @@ -509,6 +501,8 @@ <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> + <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory> --- End diff -- Removed --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153096000 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -207,3 +213,26 @@ class CarbonOptimizer( super.execute(transFormedPlan) } } + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends + SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperqlAstBuilder(conf, parser) + + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper + .createCarbontable(ctx.createTableHeader, --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153096010 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -150,9 +153,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) { new ResolveDataSource(sparkSession) :: Nil - } else { - Nil - }) + } else { Nil } + ) --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153095995 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, CreateTableContext} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonEnv.getInstance(sparkSession).storePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + + val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + var toRefreshRelation = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case _ => + } + + if (toRefreshRelation) { + super.lookupRelation(name) + } else { + rtnRelation + } + } +} + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, + parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + /** + * Create a [[CarbonSessionCatalogBuild]]. + */ + override protected lazy val catalog: CarbonSessionCatalog = { + val catalog = new CarbonSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + sparkSession, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override protected def analyzer: Analyzer = { + new Analyzer(catalog, conf) { + + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + new CarbonIUDAnalysisRule(sparkSession) +: + CarbonPreInsertionCasts +: customResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck :: HiveOnlyCheck :: Nil + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + RelationConversions(conf, catalog) +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + } + } + + override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) + +} + + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And + // optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + } + } + super.execute(transFormedPlan) + } +} + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends + SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperqlAstBuilder(conf, parser) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper + .createCarbontable(ctx.createTableHeader, --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153096022 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -133,7 +134,9 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) experimentalMethods.extraStrategies = - Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + Seq(new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153096029 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -84,8 +86,8 @@ class CarbonSessionCatalog( var toRefreshRelation = false rtnRelation match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => --- End diff -- Done --- |
Free forum by Nabble | Edit this page |