Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062164 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } --- End diff -- Instead of use like this, you can use like `alias.map(Seq(_))` in all places --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062196 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) - // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) + UnresolvedAlias(UnresolvedStar(localAlias)), tupleId) + + val subqueryAlias = --- End diff -- no need assign to a variable, return directly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062213 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) - // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) + UnresolvedAlias(UnresolvedStar(localAlias)), tupleId) + + val subqueryAlias = + CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, + alias, + Project(projList, relation), + Some(table.tableIdentifier)) + subqueryAlias --- End diff -- Please format like this ``` CarbonClassReflectionUtils.getSubqueryAlias( sparkSession, alias, Project(projList, relation), Some(table.tableIdentifier)) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062310 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) --- End diff -- Please format like old code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062338 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias --- End diff -- Don't assign to the variable, just return it. And format the code as below. ``` CarbonClassReflectionUtils.getSubqueryAlias( sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062354 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + val destinationTable = CarbonClassReflectionUtils + .getUnresolvedRelation(table.tableIdentifier, alias) --- End diff -- Format like below. ``` val destinationTable = CarbonClassReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062364 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) - val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + val destinationTable = CarbonClassReflectionUtils + .getUnresolvedRelation(table.tableIdentifier, alias) + ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) } - def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = { - val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession), - table.tableIdentifier.table) + + def processDeleteRecordsQuery(selectStmt: String, + alias: Option[String], + table: UnresolvedRelation): LogicalPlan = { + val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession), + table.tableIdentifier.table) var addedTupleId = false val parsePlan = parser.parsePlan(selectStmt) + val selectPlan = parsePlan transform { case relation: UnresolvedRelation if table.tableIdentifier == relation.tableIdentifier && !addedTupleId => addedTupleId = true val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) - val alias = table.alias match { - case Some(alias) => Some(table.alias.toSeq) + + val localalias = alias match { --- End diff -- use directly like below ``` alias.map(Seq(_)) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062388 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -201,8 +237,10 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica override def apply(logicalplan: LogicalPlan): LogicalPlan = { logicalplan transform { - case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where) - case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table) + case UpdateTable(t, cols, sel, alias, where) => processUpdateQuery(t, cols, sel, alias, where) + case DeleteRecords(statement, alias, table) => processDeleteRecordsQuery(statement, + alias, + table) --- End diff -- format it properly. like ``` case DeleteRecords(statement, alias, table) => processDeleteRecordsQuery( statement, alias, table) ``` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062410 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { --- End diff -- Move this method to utility --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062424 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta.asInstanceOf[CatalogTable] + } + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { val database = tableIdentifier.database.getOrElse( sparkSession.catalog.currentDatabase) val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDatasourceHadoopRelation.carbonRelation case LogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation + case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && + getField("tableMeta", c) + .asInstanceOf[CatalogTable].provider + .isDefined && + getField("tableMeta", c) + .asInstanceOf[CatalogTable].provider.get + .equals("org.apache.spark.sql.CarbonSource") => --- End diff -- Indentation and format is wrong. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062453 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { --- End diff -- It should be only method exists, all use that method from the utility. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062458 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { + val im = rm.reflect(obj) + val sym = im.symbol.typeSignature.member(TermName(name)) + val tableMeta = im.reflectMethod(sym.asMethod).apply() + tableMeta.asInstanceOf[CatalogTable] --- End diff -- No need to typecast --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062473 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala --- @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( - s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + + sparkSession.sessionState --- End diff -- unused code, remove it. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062493 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala --- @@ -172,7 +172,13 @@ case class CarbonRelation( } // TODO: Use data from the footers. - override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) + // TODO For 2.1 + // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) + // Todo for 2.2 + // override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes = + // this.sizeInBytes) + + // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) --- End diff -- It would be a big problem if we can't implement it. Please check it on priority --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062511 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala --- @@ -38,7 +38,6 @@ import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory import org.apache.carbondata.spark.CarbonAliasDecoderRelation - --- End diff -- Don't change class unnecessarly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062519 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -17,19 +17,23 @@ package org.apache.spark.sql.parser +import java.lang.reflect.InvocationTargetException --- End diff -- Remove unused import --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062611 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -177,7 +213,24 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val table: Parser[UnresolvedRelation] = { rep1sep(attributeName, ".") ~ opt(ident) ^^ { - case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) + case tableIdent ~ alias => UnresolvedRelation(tableIdent) + } + } + + protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String], + TableIdentifier)] = { + rep1sep(attributeName, ".") ~ opt(ident) ^^ { + case tableIdent ~ alias => + + val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent) + val localAlias: Option[String] = alias --- End diff -- This localAlias is not required --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062627 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -18,16 +18,17 @@ package org.apache.spark.sql.parser import scala.collection.mutable -import org.apache.spark.sql.{CarbonEnv, SparkSession} -import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser} +import org.apache.spark.sql.{CarbonClassReflectionUtils, CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException, SqlBaseParser} import org.apache.spark.sql.catalyst.parser.ParserUtils._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{CarbonCreateTableCommand, PartitionerField, TableModel} import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} +import org.apache.spark.util.Utils --- End diff -- Please remove all unused imports --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062664 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -69,12 +71,11 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab } } -class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { - - val parser = new CarbonSpark2SqlParser +class CarbonHelperqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) --- End diff -- Correct the name --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153062707 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -184,10 +126,86 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { } } - private def needToConvertToLowerCase(key: String): Boolean = { - val noConvertList = Array("LIST_INFO", "RANGE_INFO") - !noConvertList.exists(x => x.equalsIgnoreCase(key)); + def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] + = { + Option(ctx).map(visitPropertyKeyValues) + .getOrElse(Map.empty) } + def createCarbontable(tableHeader: CreateTableHeaderContext, --- End diff -- Typo correct the name to `createCarbontable` --- |
Free forum by Nabble | Edit this page |