Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076329 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + +/** + * This class contains the wrappers of all the case classes which are common + * across spark version 2.1 and 2.2 but have change in parameter list. + * Below are the overriden unapply methods in order to make it work + * across both the version of spark2.1 and spark 2.2 + */ +object CarbonExpressions { + + /** + * unapply method of Cast class. + */ + object MatchCast { + def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast] && expr.asInstanceOf[Cast].child.isInstanceOf[Attribute]) { + Some((expr.asInstanceOf[Cast].child.asInstanceOf[Attribute], expr.asInstanceOf[Cast].child + .dataType)) + } else { + None + } + } + } + + /** + * unapply method of Describe Table format. + */ + object CarbonDescribeTable { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { + val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] + if (describeTableCommand.table.isInstanceOf[TableIdentifier] && + describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec] && + describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, + describeTableCommand.partitionSpec, + describeTableCommand.isExtended) + } else { + None + } + } else { + None + } + } + } + + /** + * unapply method of SubqueryAlias. + */ + object CarbonSubqueryAlias { + def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { + val subqueryAlias = plan.asInstanceOf[SubqueryAlias] + if (subqueryAlias.alias.isInstanceOf[String] && + subqueryAlias.child.isInstanceOf[LogicalPlan]) { + Some(subqueryAlias.alias, + subqueryAlias.child) + } else { + None + } + } else { + None + } + } + } + + /** + * uapply method of UnresolvedRelation + */ + object CarbonUnresolvedRelation { + def unapply(plan: LogicalPlan): Option[(TableIdentifier)] = { + if (plan.isInstanceOf[UnresolvedRelation]) { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076333 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -19,10 +19,10 @@ package org.apache.spark.sql import java.io.File import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076376 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -43,14 +43,15 @@ class CarbonSession(@transient val sc: SparkContext, } @transient - override lazy val sessionState: SessionState = new CarbonSessionState(this) + override lazy val sessionState: SessionState = CarbonClassReflectionUtils + .getSessionState(sparkContext, this) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076398 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala --- @@ -21,12 +21,14 @@ import java.text.{ParseException, SimpleDateFormat} import java.util import java.util.{Locale, TimeZone} +import scala.Option import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.CastExpr import org.apache.spark.sql.sources -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} --- End diff -- Ok --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076407 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala --- @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution.command.schema import scala.collection.JavaConverters._ +import org.apache.hadoop.hive.ql.session.SessionState --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076418 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala --- @@ -94,10 +95,7 @@ private[sql] case class AlterTableDataTypeChangeCommand( tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) AlterTableUtil - .updateSchemaInfo(carbonTable, - schemaEvolutionEntry, - tableInfo)(sparkSession, - sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076436 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala --- @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.command.schema import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import org.apache.hadoop.hive.ql.session.SessionState --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076449 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala --- @@ -120,10 +121,7 @@ private[sql] case class AlterTableDropColumnCommand( val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) AlterTableUtil - .updateSchemaInfo(carbonTable, - schemaEvolutionEntry, - tableInfo)(sparkSession, - sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076462 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -42,6 +43,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil + --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076471 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -26,13 +26,19 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} +import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076491 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -26,13 +26,19 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} +import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Carbon strategies for ddl commands */ +case class CarbonDescribeTableCommand ( --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076517 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -155,6 +161,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil + case CreateDataSourceTableCommand(table, ignoreIfExists) + if table.provider.get != DDLUtils.HIVE_PROVIDER + && table.provider.get.equals("org.apache.spark.sql.CarbonSource") => + val updatedCatalog = + CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076529 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -155,6 +161,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil + case CreateDataSourceTableCommand(table, ignoreIfExists) + if table.provider.get != DDLUtils.HIVE_PROVIDER + && table.provider.get.equals("org.apache.spark.sql.CarbonSource") => + val updatedCatalog = + CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession) + val cmd = + CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076559 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -24,17 +24,15 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpress import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation import org.apache.carbondata.core.constants.CarbonCommonConstants -/** - * Insert into carbon table from other source - */ --- End diff -- Moved Back. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153076600 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -58,6 +57,8 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { ) } if (child.output.size >= relation.carbonRelation.output.size) { + sparkVersion21 = !CarbonClassReflectionUtils.hasField("query", InsertIntoCarbonTable) --- End diff -- In some places the SparkContext or SparkSession is directly not available. Therefore used the approach of accessing the field and if not present the assume it as other version. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094267 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094284 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) - // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) + UnresolvedAlias(UnresolvedStar(localAlias)), tupleId) + + val subqueryAlias = --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094337 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,12 +107,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localAlias = alias match { + case Some(a) => Some(alias.toSeq) + case _ => None + } val projList = Seq( - UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) - // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), - Project(projList, relation), Option(table.tableIdentifier)) + UnresolvedAlias(UnresolvedStar(localAlias)), tupleId) + + val subqueryAlias = + CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, + alias, + Project(projList, relation), + Some(table.tableIdentifier)) + subqueryAlias --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094428 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r153094509 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { - case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => + case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true - // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + + val subqueryAlias = CarbonClassReflectionUtils + .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier)) + subqueryAlias --- End diff -- Done --- |
Free forum by Nabble | Edit this page |