kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646308065 ########## File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala ########## @@ -60,40 +60,6 @@ object LuceneIndexExample { | AS 'lucene' """.stripMargin) - // 1. Compare the performance: - - def time(code: => Unit): Double = { - val start = System.currentTimeMillis() - code - // return time in second - (System.currentTimeMillis() - start).toDouble / 1000 - } - - val timeWithoutLuceneIndex = time { - - spark.sql( - s""" - | SELECT count(*) - | FROM personTable where id like '% test1 %' - """.stripMargin).show() - - } - - val timeWithLuceneIndex = time { - - spark.sql( Review comment: ignore this run in RunExamples.scala and revert the change here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646308564 ########## File path: index/examples/pom.xml ########## @@ -81,9 +81,6 @@ <profiles> <profile> <id>spark-2.3</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> Review comment: add 3.1 profile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646310399 ########## File path: integration/flink/pom.xml ########## @@ -30,6 +30,11 @@ <artifactId>carbondata-format</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.thoughtworks.paranamer</groupId> Review comment: add LICENSE for this library. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646310548 ########## File path: integration/flink/pom.xml ########## @@ -193,9 +220,7 @@ <profiles> <profile> <id>spark-2.3</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> + <activation><activeByDefault>true</activeByDefault></activation> Review comment: revert this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646318477 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ########## @@ -86,6 +87,11 @@ class SparkCarbonFileFormat extends FileFormat options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = sparkSession.sessionState.newHadoopConf() + if (options.isEmpty && files.isEmpty) { Review comment: remove this code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646319099 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ########## @@ -65,7 +65,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR") protected val EXCLUDE = carbonKeyWord("EXCLUDE") protected val EXPLAIN = carbonKeyWord("EXPLAIN") - protected val EXTENDED = carbonKeyWord("EXTENDED") + protected val MODE = carbonKeyWord("EXTENDED") | + carbonKeyWord("CODEGEN") | + carbonKeyWord("COST") | + carbonKeyWord("FORMATTED") Review comment: create JIRA for other MODES and add a comment here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646323700 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala ########## @@ -56,4 +58,13 @@ class CarbonExtensionSpark2SqlParser extends CarbonSpark2SqlParser { CarbonSparkSqlParserUtil.loadDataNew( databaseNameOp, tableName, Option(optionsList), partitions, filePath, isOverwrite) } + + /** + * REFRESH MATERIALIZED VIEW mv_name + */ + private lazy val refreshTable: Parser[LogicalPlan] = Review comment: why is this needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646326285 ########## File path: streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala ########## @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.carbondata.spark.adapter +package org.apache.carbondata.util -import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +object SparkStreamingUtil { Review comment: package structure has "." instead of "/" to seperate packages -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646327536 ########## File path: mv/plan/src/main/spark2.4/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala ########## @@ -17,9 +17,11 @@ package org.apache.carbondata.mv.plans.modular -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} Review comment: revert this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646328572 ########## File path: mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.plans.modular + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode} +import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineUnions, ConstantFolding, EliminateLimits, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicates, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery} +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.plans.util.BirdcageOptimizer + +object SparkVersionHelper { + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes: AttributeMap[ColumnStat] = stats.attributeStats + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + Statistics(stats.sizeInBytes, stats.rowCount, attributeStats) + } + + def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = { + val Subquery(newPlan, _) = BirdcageOptimizer.execute(Subquery.fromExpression(s)) + newPlan + } + + def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = { + QueryPlan.normalizeExpressions(r, attrs) + } + + def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Alias] = { + rAliasMap.asInstanceOf[AttributeMap[Alias]] + } + + def seqOfRules : Seq[Rule[LogicalPlan]] = { + Seq( + // Operator push down + PushProjectionThroughUnion, + ReorderJoin, + EliminateOuterJoin, + PushPredicateThroughJoin, + PushDownPredicates, + ColumnPruning, + // Operator combine + CollapseRepartition, + CollapseProject, + CollapseWindow, + CombineFilters, + EliminateLimits, + CombineUnions, + // Constant folding and strength reduction + NullPropagation, + FoldablePropagation, + ConstantFolding, + ReorderAssociativeOperator, + // No need to apply LikeSimplification rule while creating MV + // as modular plan asCompactSql will be set in schema + // LikeSimplification, + BooleanSimplification, + SimplifyConditionals, + RemoveDispensableExpressions, + SimplifyBinaryComparison, + EliminateSorts, + SimplifyCasts, + SimplifyCaseConversionExpressions, + RewriteCorrelatedScalarSubquery, + EliminateSerialization, + RemoveRedundantAliases) + } + +} + +trait getVerboseString extends LeafNode { + def verboseString: String = toString +} + +trait groupByUnaryNode extends UnaryNode { + override def verboseString(maxFields: Int): String = super.verboseString(maxFields) + + override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = { + super.mapProductIterator(f) + } + + override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = { + val groupBy = super.mapChildren(f) + if (this.rewritten && !groupBy.rewritten) { + groupBy.setRewritten() + } + groupBy + } +} + +trait selectModularPlan extends ModularPlan { Review comment: Change to "SelectModularPlan" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646332185 ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -232,9 +236,9 @@ object CarbonToSparkAdapter { } class CarbonOptimizer( - session: SparkSession, - catalog: SessionCatalog, - optimizer: Optimizer) extends Optimizer(catalog) { + session: SparkSession, Review comment: revert this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646333532 ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -187,11 +193,10 @@ object CarbonToSparkAdapter { } } - class CarbonOptimizer( - session: SparkSession, - catalog: SessionCatalog, - optimizer: Optimizer) extends Optimizer(catalog) { + session: SparkSession, Review comment: revert this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646336533 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ########## @@ -86,6 +87,11 @@ class SparkCarbonFileFormat extends FileFormat options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = sparkSession.sessionState.newHadoopConf() + if (options.isEmpty && files.isEmpty) { Review comment: path is removed from options from 3.1, so this change is required to throw the proper exception -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646345275 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala ########## @@ -56,4 +58,13 @@ class CarbonExtensionSpark2SqlParser extends CarbonSpark2SqlParser { CarbonSparkSqlParserUtil.loadDataNew( databaseNameOp, tableName, Option(optionsList), partitions, filePath, isOverwrite) } + + /** + * REFRESH MATERIALIZED VIEW mv_name + */ + private lazy val refreshTable: Parser[LogicalPlan] = Review comment: Spark 3.1 has added Refresh table in SparkSqlParser and it fails with UnresolvedTableOrView. So, carbon need to handle it. @vikramahuja1001 check and remove from DDLStrategy if not used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r646379260 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ########## @@ -72,7 +72,7 @@ abstract class CarbonRDD[T: ClassTag]( def internalCompute(split: Partition, context: TaskContext): Iterator[T] final def compute(split: Partition, context: TaskContext): Iterator[T] = { - TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) + SparkVersionAdapter.addTaskCompletionListener(ThreadLocalSessionInfo.unsetAll()) Review comment: please ad the scala bug in the comment of scala method `SparkVersionAdapter` ########## File path: integration/spark/src/main/common2.4and3.1/org/apache/spark/sql/CarbonBoundReference.scala ########## @@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu override def qualifier: Seq[String] = null override def newInstance(): NamedExpression = throw new UnsupportedOperationException -} \ No newline at end of file +} Review comment: revert this ########## File path: integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala ########## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.mutable + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.codegen.{ExprCode, GeneratePredicate} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoTable, Join, LogicalPlan, OneRowRelation, Statistics} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.{QueryExecution, ShuffledRowRDD, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, RefreshTable} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan +import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{DataType, StructField} +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.carbondata.common.exceptions.DeprecatedFeatureException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.util.CarbonScalaUtil + +object SparkVersionAdapter { Review comment: please select the code from 63 to last and correct the style, (Ctrl + Shift + Alt + L) ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.parser + +import scala.collection.mutable + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SparkVersionAdapter} +import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser} +import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.CarbonReflectionUtils + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Concrete parser for Spark SQL statements and carbon specific + * statements + */ +class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends SparkSqlParser { + + val parser = new CarbonSpark2SqlParser + + override val astBuilder = CarbonReflectionUtils.getAstBuilder(conf, parser, sparkSession) + + private val substitutor = new VariableSubstitution + + override def parsePlan(sqlText: String): LogicalPlan = { + CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession) + try { + val parsedPlan = super.parsePlan(sqlText) + CarbonScalaUtil.cleanParserThreadLocals + parsedPlan + } catch { + case ce: MalformedCarbonCommandException => + CarbonScalaUtil.cleanParserThreadLocals + throw ce + case ex: Throwable => + try { + parser.parse(sqlText) + } catch { + case mce: MalformedCarbonCommandException => + throw mce + case e: Throwable => + CarbonException.analysisException( + s"""== Parse1 == + |${ex.getMessage} + |== Parse2 == + |${e.getMessage} + """.stripMargin.trim) + } + } + } + + protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { + super.parse(substitutor.substitute(command))(toResult) + } +} + +class CarbonHelperSqlAstBuilder(conf: SQLConf, + parser: CarbonSpark2SqlParser, + sparkSession: SparkSession) + extends SparkSqlAstBuilderWrapper(conf) { + /** + * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. + */ + override def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { + val props = visitTablePropertyList(ctx) + CarbonSparkSqlParserUtil.visitPropertyKeyValues(ctx, props) + } + + def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] + = { + Option(ctx).map(visitPropertyKeyValues) + .getOrElse(Map.empty) + } + + def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext, + BucketSpecContext, PartitionFieldListContext, ColTypeListContext, TablePropertyListContext, + LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = { + // val parser = new CarbonSpark2SqlParser Review comment: remove this line ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala ########## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan +import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty} +import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType} +import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.carbondata.common.exceptions.DeprecatedFeatureException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF} +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.util.CarbonScalaUtil + +object SparkVersionAdapter { + + def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(), + mode: Option[String]) : ExplainCommand = { + ExplainCommand(logicalPlan, ExplainMode.fromString(mode.getOrElse(SimpleMode.name))) + } + + def getExplainCommandObj(mode: Option[String]) : ExplainCommand = { + ExplainCommand(OneRowRelation(), ExplainMode.fromString(mode.getOrElse(SimpleMode.name))) + } + + /** + * As a part of SPARK-24085 Hive tables supports scala subquery for + * the partitioned tables,so Carbon also needs to supports + * @param partitionSet + * @param filterPredicates + * @return + */ + def getPartitionFilter( + partitionSet: AttributeSet, + filterPredicates: Seq[Expression]): Seq[Expression] = { Review comment: correct the style here and check whole file once, select whole file and correct the style once ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala ########## @@ -1551,26 +1552,28 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { sql("insert into hive_table select 'a','b','1'"); } - val e2 = intercept[SparkException] { - sql("insert into hive_table2 select 'a','b','binary'"); - } + if (!SparkUtil.isSparkVersionXAndAbove("3")) { + val e2 = intercept[SparkException] { + sql("insert into hive_table2 select 'a','b','binary'"); + } - assert(e2.getMessage.contains( - "Dynamic partition strict mode requires at least one static partition column")) + assert(e2.getMessage.contains( + "Dynamic partition strict mode requires at least one static partition column")) - val eInt2 = intercept[Exception] { - sql("insert into hive_table2 select 'a','b','1'"); - } + val eInt2 = intercept[Exception] { + sql("insert into hive_table2 select 'a','b','1'"); + } - val e3 = intercept[SparkException] { - sql("insert into parquet_table select 'a','b','binary'"); - } + val e3 = intercept[SparkException] { + sql("insert into parquet_table select 'a','b','binary'"); + } - assert(e3.getMessage.contains( - "Dynamic partition strict mode requires at least one static partition column")) + assert(e3.getMessage.contains( + "Dynamic partition strict mode requires at least one static partition column")) - val eInt3 = intercept[Exception] { - sql("insert into parquet_table select 'a','b','1'"); + val eInt3 = intercept[Exception] { + sql("insert into parquet_table select 'a','b','1'"); + } Review comment: revert ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ########## @@ -169,7 +128,7 @@ case class CarbonDataSourceScan( override protected def doCanonicalize(): CarbonDataSourceScan = { CarbonDataSourceScan( relation, - output.map(QueryPlan.normalizeExprId(_, output)), + a, Review comment: please rename to the proper variable name like `outputAttibutesAfterNormalizingExpressionIds` ########## File path: mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.plans.modular + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode} +import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineUnions, ConstantFolding, EliminateLimits, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicates, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery} +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.plans.util.BirdcageOptimizer + +object SparkVersionHelper { + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes: AttributeMap[ColumnStat] = stats.attributeStats + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + Statistics(stats.sizeInBytes, stats.rowCount, attributeStats) + } + + def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = { + val Subquery(newPlan, _) = BirdcageOptimizer.execute(Subquery.fromExpression(s)) + newPlan + } + + def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = { + QueryPlan.normalizeExpressions(r, attrs) + } + + def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Alias] = { + rAliasMap.asInstanceOf[AttributeMap[Alias]] + } + + def seqOfRules : Seq[Rule[LogicalPlan]] = { + Seq( + // Operator push down + PushProjectionThroughUnion, + ReorderJoin, + EliminateOuterJoin, + PushPredicateThroughJoin, + PushDownPredicates, + ColumnPruning, + // Operator combine + CollapseRepartition, + CollapseProject, + CollapseWindow, + CombineFilters, + EliminateLimits, + CombineUnions, + // Constant folding and strength reduction + NullPropagation, + FoldablePropagation, + ConstantFolding, + ReorderAssociativeOperator, + // No need to apply LikeSimplification rule while creating MV + // as modular plan asCompactSql will be set in schema + // LikeSimplification, + BooleanSimplification, + SimplifyConditionals, + RemoveDispensableExpressions, + SimplifyBinaryComparison, + EliminateSorts, + SimplifyCasts, + SimplifyCaseConversionExpressions, + RewriteCorrelatedScalarSubquery, + EliminateSerialization, + RemoveRedundantAliases) + } + +} + +trait getVerboseString extends LeafNode { + def verboseString: String = toString +} + +trait groupByUnaryNode extends UnaryNode { + override def verboseString(maxFields: Int): String = super.verboseString(maxFields) + + override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = { + super.mapProductIterator(f) + } + + override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = { + val groupBy = super.mapChildren(f) + if (this.rewritten && !groupBy.rewritten) { + groupBy.setRewritten() + } + groupBy + } +} + +trait selectModularPlan extends ModularPlan { + override def verboseString(maxFields: Int): String = super.verboseString(maxFields) + + override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = { + super.mapProductIterator(f) + } + + override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = { + val select = super.mapChildren(f) + if (this.rewritten && !select.rewritten) { + select.setRewritten() + } + select + } +} + +trait unionModularPlan extends ModularPlan { Review comment: please change all trait names, according to above @kunal642 comment ########## File path: integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.CarbonInputMetrics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => SparkExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec} +import org.apache.spark.sql.execution.strategy.CarbonPlanHelper +import org.apache.spark.sql.optimizer.CarbonFilters + +import org.apache.carbondata.core.index.IndexFilter +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.spark.rdd.CarbonScanRDD + +abstract class CarbonDataSourceScanHelper(relation: CarbonDatasourceHadoopRelation, + output: Seq[Attribute], + partitionFilters: Seq[SparkExpression], + pushedDownFilters: Seq[Expression], + pushedDownProjection: CarbonProjection, + directScanSupport: Boolean, + extraRDD: Option[(RDD[InternalRow], Boolean)], + segmentIds: Option[String]) + extends DataSourceScanExec with ColumnarBatchScan { Review comment: please correct the style of class definition, please also check all the places, if I miss somewhere ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule} +import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule +import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType} +import org.apache.spark.sql.util.SparkSQLUtil Review comment: there are so many unused imports, please remove and check in other places and also check why the CI didn't fail for this ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala ########## @@ -21,21 +21,20 @@ import java.util.concurrent.Callable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.execution.strategy.{CarbonSourceStrategy, DDLStrategy, DMLStrategy, StreamingTableStrategy} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule} import org.apache.spark.sql.parser.CarbonSparkSqlParser -import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema Review comment: revert changes if not required ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonBoundReference.scala ########## @@ -18,8 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types.DataType Review comment: revert this change ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -544,12 +545,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } protected lazy val explainPlan: Parser[LogicalPlan] = - (EXPLAIN ~> opt(EXTENDED)) ~ start ^^ { - case isExtended ~ logicalPlan => + (EXPLAIN ~> opt(MODE)) ~ start ^^ { + case mode ~ logicalPlan => Review comment: can you please add test cases with different modes? ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala ########## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode} Review comment: many unused imports, please remove ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala ########## @@ -298,7 +301,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { for (i <- 0 until 10) { sql(s"alter table addsegment1 add segment " + s"options('path'='${ newPath + i }', 'format'='carbon')").collect() - + sql("select count(*) from addsegment1").show() Review comment: revert this ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala ########## @@ -1551,26 +1552,28 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { sql("insert into hive_table select 'a','b','1'"); } - val e2 = intercept[SparkException] { - sql("insert into hive_table2 select 'a','b','binary'"); - } + if (!SparkUtil.isSparkVersionXAndAbove("3")) { + val e2 = intercept[SparkException] { + sql("insert into hive_table2 select 'a','b','binary'"); + } - assert(e2.getMessage.contains( - "Dynamic partition strict mode requires at least one static partition column")) + assert(e2.getMessage.contains( + "Dynamic partition strict mode requires at least one static partition column")) - val eInt2 = intercept[Exception] { - sql("insert into hive_table2 select 'a','b','1'"); - } + val eInt2 = intercept[Exception] { + sql("insert into hive_table2 select 'a','b','1'"); + } - val e3 = intercept[SparkException] { - sql("insert into parquet_table select 'a','b','binary'"); - } + val e3 = intercept[SparkException] { + sql("insert into parquet_table select 'a','b','binary'"); + } - assert(e3.getMessage.contains( - "Dynamic partition strict mode requires at least one static partition column")) + assert(e3.getMessage.contains( + "Dynamic partition strict mode requires at least one static partition column")) Review comment: revert all the above space changes ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala ########## @@ -100,6 +100,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { test("clean up table and test trash folder with Marked For Delete and Compacted segments") { // do not send MFD folders to trash createTable() + sql(s"""Show Tables """).show() Review comment: revert this ########## File path: mv/plan/src/main/common2.3and2.4/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.plans.modular + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineLimits, CombineUnions, ConstantFolding, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicate, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, RemoveRedundantProject, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{DataType, Metadata} + +import org.apache.carbondata.mv.plans.util.BirdcageOptimizer + + +object SparkVersionHelper { + + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) Review comment: please correct the style and do for all code of class ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala ########## @@ -842,7 +842,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll { Row("d", "3") ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) - initframe.write + initframe.repartition(1).write Review comment: why this change ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala ########## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan +import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty} +import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType} +import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.carbondata.common.exceptions.DeprecatedFeatureException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF} +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.util.CarbonScalaUtil + +object SparkVersionAdapter { + + def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(), + mode: Option[String]) : ExplainCommand = { + ExplainCommand(logicalPlan, ExplainMode.fromString(mode.getOrElse(SimpleMode.name))) + } + + def getExplainCommandObj(mode: Option[String]) : ExplainCommand = { + ExplainCommand(OneRowRelation(), ExplainMode.fromString(mode.getOrElse(SimpleMode.name))) + } + + /** + * As a part of SPARK-24085 Hive tables supports scala subquery for + * the partitioned tables,so Carbon also needs to supports + * @param partitionSet + * @param filterPredicates + * @return + */ + def getPartitionFilter( + partitionSet: AttributeSet, + filterPredicates: Seq[Expression]): Seq[Expression] = { + filterPredicates + .filterNot(SubqueryExpression.hasSubquery) + .filter { filter => + filter.references.nonEmpty && filter.references.subsetOf(partitionSet) + } + } + + def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): Seq[Expression] = { + filter + } + + // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1 + def getOptimizeCodegenRule(): Seq[Rule[LogicalPlan]] = { + Seq.empty + } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) + } + + def getOutput(subQueryAlias: SubqueryAlias): Seq[Attribute] = { + val newAlias = Seq(subQueryAlias.identifier.name) + subQueryAlias.child.output.map(_.withQualifier(newAlias)) + } + + def stringToTimestamp(timestamp: String): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), ZoneId.systemDefault()) + } + + def stringToTime(value: String): java.util.Date = { + stringToDateValue(value) + } + + def getPredicate(inputSchema: Seq[Attribute], + condition: Option[Expression]): InternalRow => Boolean = { + Predicate.create(condition.get, inputSchema).eval _ + } + + @tailrec + private def stringToDateValue(value: String): java.util.Date = { + val indexOfGMT = value.indexOf("GMT") + if (indexOfGMT != -1) { + // ISO8601 with a weird time zone specifier (2000-01-01T00:00GMT+01:00) + val s0 = value.substring(0, indexOfGMT) + val s1 = value.substring(indexOfGMT + 3) + // Mapped to 2000-01-01T00:00+01:00 + stringToDateValue(s0 + s1) + } else if (!value.contains('T')) { + // JDBC escape string + if (value.contains(' ')) { + Timestamp.valueOf(value) + } else { + Date.valueOf(value) + } + } else { + DatatypeConverter.parseDateTime(value).getTime + } + } + + def timeStampToString(timeStamp: Long): String = { + TimestampFormatter.getFractionFormatter(ZoneId.systemDefault()).format(timeStamp) + } + + def dateToString(date: Int): String = { + DateTimeUtils.daysToLocalDate(date).toString + } + + def addTaskCompletionListener[U](f: => U) { + TaskContext.get().addTaskCompletionListener[Unit] { context => + f + } + } + + def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = { + val tableName = u.tableName.split("\\.") + if (tableName.size == 2) { + Some(TableIdentifier(tableName(1), Option(tableName(0)))) + } else { + val currentDatabase = SparkSQLUtil.getSparkSession.sessionState.catalog.getCurrentDatabase + Some(TableIdentifier(tableName(0), Option(currentDatabase))) + } + } + + def createRangeListScalaUDF(toRangeListUDF: ToRangeListAsStringUDF, + dataType: StringType.type, + children: Seq[Expression], + inputTypes: Seq[DataType]): ScalaUDF = { + ScalaUDF(toRangeListUDF, + dataType, + children, + Nil, + None, + Some("ToRangeListAsString")) + } + + def getTransformedPolygonJoinUdf(scalaUdf: ScalaUDF, + udfChildren: Seq[Expression], + polygonJoinUdf: InPolygonJoinUDF): ScalaUDF = { + ScalaUDF(polygonJoinUdf, + scalaUdf.dataType, + udfChildren, + scalaUdf.inputEncoders, + scalaUdf.outputEncoder, + scalaUdf.udfName) + } + + def getTableIdentifier(parts: Seq[String]): TableIdentifier = { + if (parts.length == 1) { + TableIdentifier(parts.head, None) + } else { + TableIdentifier(parts(1), Option(parts.head)) + } + } + + def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow], + child: SparkPlan, serializer: Serializer): ShuffledRowRDD = { + val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + new ShuffledRowRDD( + ShuffleExchangeExec.prepareShuffleDependency( + localTopK, child.output, SinglePartition, serializer, writeMetrics), writeMetrics) + } + + def invokeAnalyzerExecute(analyzer: Analyzer, + plan: LogicalPlan): LogicalPlan = { + analyzer.executeAndCheck(plan, QueryPlanningTracker.get.getOrElse(new QueryPlanningTracker)) + } + + def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = { + QueryPlan.normalizeExpressions(r, attrs) + } + + def getBuildRight: BuildSide = { + BuildRight + } + + def getBuildLeft: BuildSide = { + BuildLeft + } + + def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = { + SQLExecution.withNewExecutionId(queryExecution, None)(_) + } + + def createJoinNode(child: LogicalPlan, + targetTable: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]): Join = { + Join(child, targetTable, joinType, condition, JoinHint.NONE) + } + + def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = { + x.partitionSpec + } + + type CarbonBuildSideType = BuildSide + type InsertIntoStatementWrapper = InsertIntoStatement + + def createRefreshTableCommand(tableIdentifier: TableIdentifier): RefreshTableCommand = { + RefreshTableCommand(tableIdentifier) + } + + type RefreshTables = RefreshTableCommand + + /** + * Validates the partition columns and return's A tuple of partition columns and partitioner + * fields. + * + * @param partitionColumns An instance of ColTypeListContext having parser rules for + * column. + * @param colNames <Seq[String]> Sequence of Table column names. + * @param tableProperties <Map[String, String]> Table property map. + * @param partitionByStructFields Seq[StructField] Sequence of partition fields. + * @return <Seq[PartitionerField]> A Seq of partitioner fields. + */ + def validatePartitionFields( + partitionColumns: PartitionFieldListContext, + colNames: Seq[String], + tableProperties: mutable.Map[String, String], + partitionByStructFields: Seq[StructField]): Seq[PartitionerField] = { + + val partitionerFields = partitionByStructFields.map { structField => + PartitionerField(structField.name, Some(structField.dataType.toString), null) + } + // validate partition clause + if (partitionerFields.nonEmpty) { + // partition columns should not be part of the schema + val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet + .intersect(colNames.map(_.toLowerCase).toSet) + if (badPartCols.nonEmpty) { + operationNotAllowed(s"Partition columns should not be specified in the schema: " + + badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]") + , partitionColumns: PartitionFieldListContext) + } + } + partitionerFields + } + + /** + * The method validates the create table command and returns the create table or + * ctas table LogicalPlan. + * + * @param createTableTuple a tuple of (CreateTableHeaderContext, SkewSpecContext, + * BucketSpecContext, PartitionFieldListContext, ColTypeListContext, + * TablePropertyListContext, + * LocationSpecContext, Option[String], TerminalNode, QueryContext, + * String) + * @param extraTableTuple A tuple of (Seq[StructField], Boolean, TableIdentifier, Boolean, + * Seq[String], + * Option[String], mutable.Map[String, String], Map[String, String], + * Seq[StructField], + * Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession, + * Option[LogicalPlan]) + * @return <LogicalPlan> of create table or ctas table + * + */ + def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext, + BucketSpecContext, PartitionFieldListContext, ColTypeListContext, TablePropertyListContext, + LocationSpecContext, Option[String], TerminalNode, QueryContext, String), + extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, Seq[String], + Option[String], mutable.Map[String, String], Map[String, String], Seq[StructField], + Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession, + Option[LogicalPlan])): LogicalPlan = { + val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, columns, + tablePropertyList, locationSpecContext, tableComment, ctas, query, provider) = createTableTuple + val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath, + tableProperties, properties, partitionByStructFields, partitionFields, + parser, sparkSession, selectQuery) = extraTableTuple + val options = new CarbonOption(properties) + // validate streaming property + validateStreamingProperty(options) + var fields = parser.getFields(cols ++ partitionByStructFields) + // validate for create table as select + selectQuery match { + case Some(q) => + // create table as select does not allow creation of partitioned table + if (partitionFields.nonEmpty) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Carbondata file formats." + operationNotAllowed(errorMessage, partitionColumns) + } + // create table as select does not allow to explicitly specify schema + if (fields.nonEmpty) { + operationNotAllowed( + "Schema may not be specified in a Create Table As Select (CTAS) statement", columns) + } + // external table is not allow + if (external) { + operationNotAllowed("Create external table as select", tableHeader) + } + fields = parser + .getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore + .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get)) + case _ => + // ignore this case + } + val columnNames = fields.map(_.name.get) + checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames) + if (partitionFields.nonEmpty && options.isStreaming) { + operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns) + } + + if (!external && fields.isEmpty) { + throw new MalformedCarbonCommandException("Creating table without column(s) is not supported") + } + if (external && fields.isEmpty && tableProperties.nonEmpty) { + // as fields are always zero for external table, cannot validate table properties. + operationNotAllowed( + "Table properties are not supported for external table", tablePropertyList) + } + + // Global dictionary is deprecated since 2.0 + if (tableProperties.contains(CarbonCommonConstants.DICTIONARY_INCLUDE) || + tableProperties.contains(CarbonCommonConstants.DICTIONARY_EXCLUDE)) { + DeprecatedFeatureException.globalDictNotSupported() + } + + val bucketFields = parser.getBucketFields(tableProperties, fields, options) + var isTransactionalTable: Boolean = true + + val tableInfo = if (external) { + if (fields.nonEmpty) { + // user provided schema for this external table, this is not allow currently + // see CARBONDATA-2866 + operationNotAllowed( + "Schema must not be specified for external table", columns) + } + if (partitionByStructFields.nonEmpty) { + operationNotAllowed( + "Partition is not supported for external table", partitionColumns) + } + // read table info from schema file in the provided table path + // external table also must convert table name to lower case + val identifier = AbsoluteTableIdentifier.from( + tablePath.get, + CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(), + tableIdentifier.table.toLowerCase()) + val table = try { + val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) + if (!FileFactory.isFileExist(schemaPath)) { + if (provider.equalsIgnoreCase("'carbonfile'")) { + SchemaReader.inferSchema(identifier, true) + } else { + isTransactionalTable = false + SchemaReader.inferSchema(identifier, false) + } + } else { + SchemaReader.getTableInfo(identifier) + } + } catch { + case e: Throwable => + operationNotAllowed(s"Invalid table path provided: ${ tablePath.get } ", tableHeader) + } + + // set "_external" property, so that DROP TABLE will not delete the data + if (provider.equalsIgnoreCase("'carbonfile'")) { + table.getFactTable.getTableProperties.put("_filelevelformat", "true") + table.getFactTable.getTableProperties.put("_external", "false") + } else { + table.getFactTable.getTableProperties.put("_external", "true") + table.getFactTable.getTableProperties.put("_filelevelformat", "false") + } + var isLocalDic_enabled = table.getFactTable.getTableProperties + .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) + if (null == isLocalDic_enabled) { + table.getFactTable.getTableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE, + CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)) + } + isLocalDic_enabled = table.getFactTable.getTableProperties + .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE) + if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) && + isLocalDic_enabled.toBoolean) { + val allColumns = table.getFactTable.getListOfColumns + for (i <- 0 until allColumns.size()) { + val cols = allColumns.get(i) + if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) { + cols.setLocalDictColumn(true) + } + } + table.getFactTable.setListOfColumns(allColumns) + } + table + } else { + // prepare table model of the collected tokens + val tableModel: TableModel = CarbonParserUtil.prepareTableModel( + ifNotExists, + convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + partitionFields, + tableProperties, + bucketFields, + isAlterFlow = false, + tableComment) + TableNewProcessor(tableModel) + } + tableInfo.setTransactionalTable(isTransactionalTable) + selectQuery match { + case query@Some(q) => + CarbonCreateTableAsSelectCommand( + tableInfo = tableInfo, + query = query.get, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath) + case _ => + CarbonCreateTableCommand( + tableInfo = tableInfo, + ifNotExistsSet = ifNotExists, + tableLocation = tablePath, + external) + } + } + + def getField(parser: CarbonSpark2SqlParser, + schema: Seq[QualifiedColType], isExternal: Boolean = false): Seq[Field] = { + schema.map { col => + parser.getFields(col.comment, col.name.head, col.dataType, isExternal) + } + } + + def supportsBatchOrColumnar(scan: CarbonDataSourceScan): Boolean = { + scan.supportsColumnar + } + + def createDataset(sparkSession: SparkSession, qe: QueryExecution) : Dataset[Row] = { + new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) + } + + def createSharedState(sparkContext: SparkContext) : SharedState = { + new SharedState(sparkContext, Map.empty[String, String]) + } + + def translateFilter(dataFilters: Seq[Expression]) : Seq[Filter] = { + dataFilters.flatMap(DataSourceStrategy.translateFilter(_, + supportNestedPredicatePushdown = false)) + } + + def getCarbonOptimizer(session: SparkSession, + sessionState: SessionState): CarbonOptimizer = { + new CarbonOptimizer(session, sessionState.optimizer) + } + + def isCharType(dataType: DataType): Boolean = { + dataType.isInstanceOf[CharType] + } + + def isVarCharType(dataType: DataType): Boolean = { + dataType.isInstanceOf[VarcharType] + } + + def getTypeName(s: AbstractDataType): String = { + s.defaultConcreteType.typeName + } + + def getInsertIntoCommand(table: LogicalPlan, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean): InsertIntoStatement = { + InsertIntoStatement( + table, + partition, + Nil, + query, + overwrite, + ifPartitionNotExists) + } +} + +case class CarbonBuildSide(buildSide: BuildSide) { + def isRight: Boolean = buildSide.isInstanceOf[BuildRight.type] + def isLeft: Boolean = buildSide.isInstanceOf[BuildLeft.type] +} + +abstract class CarbonTakeOrderedAndProjectExecHelper(sortOrder: Seq[SortOrder], + limit: Int, skipMapOrder: Boolean, readFromHead: Boolean) extends UnaryExecNode { + override def simpleString(maxFields: Int): String = { + val orderByString = sortOrder.mkString("[", ",", "]") + val outputString = output.mkString("[", ",", "]") + + s"CarbonTakeOrderedAndProjectExec(limit=$limit, orderBy=$orderByString, " + + s"skipMapOrder=$skipMapOrder, readFromHead=$readFromHead, output=$outputString)" + } +} Review comment: add a new line after class, all these should be caught in CI, please check once ########## File path: mv/plan/src/main/spark2.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala ########## @@ -46,5 +46,4 @@ object ExpressionHelper { def getTheLastQualifier(reference: AttributeReference): String = { reference.qualifier.head } - } Review comment: Please revert this class changes, not needed ########## File path: scalastyle-config.xml ########## @@ -203,19 +203,6 @@ This file is divided into 3 sections: ]]></customMessage> </check> - <check customId="awaitresult" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> - <parameters> - <parameter name="regex">Await\.result</parameter> - </parameters> - <customMessage><![CDATA[ - Are you sure that you want to use Await.result? In most cases, you should use ThreadUtils.awaitResult instead. - If you must use Await.result, wrap the code block with - // scalastyle:off awaitresult - Await.result(...) - // scalastyle:on awaitresult - ]]></customMessage> - </check> - Review comment: why this removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r647101196 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -544,12 +545,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } protected lazy val explainPlan: Parser[LogicalPlan] = - (EXPLAIN ~> opt(EXTENDED)) ~ start ^^ { - case isExtended ~ logicalPlan => + (EXPLAIN ~> opt(MODE)) ~ start ^^ { + case mode ~ logicalPlan => Review comment: Carbon does not support any other modes at the moment. Have created a JIRA for that. Refer: https://issues.apache.org/jira/browse/CARBONDATA-4201 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r647101468 ########## File path: integration/flink/pom.xml ########## @@ -30,6 +30,11 @@ <artifactId>carbondata-format</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.thoughtworks.paranamer</groupId> Review comment: done ########## File path: integration/flink/pom.xml ########## @@ -193,9 +220,7 @@ <profiles> <profile> <id>spark-2.3</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> + <activation><activeByDefault>true</activeByDefault></activation> Review comment: done ########## File path: streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala ########## @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.carbondata.spark.adapter +package org.apache.carbondata.util -import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +object SparkStreamingUtil { Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r647101730 ########## File path: mv/plan/src/main/spark2.4/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala ########## @@ -17,9 +17,11 @@ package org.apache.carbondata.mv.plans.modular -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r647101776 ########## File path: mv/plan/src/main/spark3.1/org/apache/carbondata/mv/plans/modular/SparkVersionHelper.scala ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.plans.modular + +import scala.reflect.ClassTag + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSeq, Expression, ExprId, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode} +import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, CollapseProject, CollapseRepartition, CollapseWindow, ColumnPruning, CombineFilters, CombineUnions, ConstantFolding, EliminateLimits, EliminateOuterJoin, EliminateSerialization, EliminateSorts, FoldablePropagation, NullPropagation, PushDownPredicates, PushPredicateThroughJoin, PushProjectionThroughUnion, RemoveDispensableExpressions, RemoveRedundantAliases, ReorderAssociativeOperator, ReorderJoin, RewriteCorrelatedScalarSubquery, SimplifyBinaryComparison, SimplifyCaseConversionExpressions, SimplifyCasts, SimplifyConditionals} +import org.apache.spark.sql.catalyst.plans.{logical, JoinType, QueryPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan, Statistics, Subquery} +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.plans.util.BirdcageOptimizer + +object SparkVersionHelper { + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes: AttributeMap[ColumnStat] = stats.attributeStats + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + Statistics(stats.sizeInBytes, stats.rowCount, attributeStats) + } + + def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = { + val Subquery(newPlan, _) = BirdcageOptimizer.execute(Subquery.fromExpression(s)) + newPlan + } + + def normalizeExpressions[T <: Expression](r: T, attrs: AttributeSeq): T = { + QueryPlan.normalizeExpressions(r, attrs) + } + + def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Alias] = { + rAliasMap.asInstanceOf[AttributeMap[Alias]] + } + + def seqOfRules : Seq[Rule[LogicalPlan]] = { + Seq( + // Operator push down + PushProjectionThroughUnion, + ReorderJoin, + EliminateOuterJoin, + PushPredicateThroughJoin, + PushDownPredicates, + ColumnPruning, + // Operator combine + CollapseRepartition, + CollapseProject, + CollapseWindow, + CombineFilters, + EliminateLimits, + CombineUnions, + // Constant folding and strength reduction + NullPropagation, + FoldablePropagation, + ConstantFolding, + ReorderAssociativeOperator, + // No need to apply LikeSimplification rule while creating MV + // as modular plan asCompactSql will be set in schema + // LikeSimplification, + BooleanSimplification, + SimplifyConditionals, + RemoveDispensableExpressions, + SimplifyBinaryComparison, + EliminateSorts, + SimplifyCasts, + SimplifyCaseConversionExpressions, + RewriteCorrelatedScalarSubquery, + EliminateSerialization, + RemoveRedundantAliases) + } + +} + +trait getVerboseString extends LeafNode { + def verboseString: String = toString +} + +trait groupByUnaryNode extends UnaryNode { + override def verboseString(maxFields: Int): String = super.verboseString(maxFields) + + override def mapProductIterator[B](f: Any => B)(implicit evidence$1: ClassTag[B]): Array[B] = { + super.mapProductIterator(f) + } + + override def mapChildren(f: ModularPlan => ModularPlan) : ModularPlan = { + val groupBy = super.mapChildren(f) + if (this.rewritten && !groupBy.rewritten) { + groupBy.setRewritten() + } + groupBy + } +} + +trait selectModularPlan extends ModularPlan { Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -232,9 +236,9 @@ object CarbonToSparkAdapter { } class CarbonOptimizer( - session: SparkSession, - catalog: SessionCatalog, - optimizer: Optimizer) extends Optimizer(catalog) { + session: SparkSession, Review comment: done ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -187,11 +193,10 @@ object CarbonToSparkAdapter { } } - class CarbonOptimizer( - session: SparkSession, - catalog: SessionCatalog, - optimizer: Optimizer) extends Optimizer(catalog) { + session: SparkSession, Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r647102389 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ########## @@ -72,7 +72,7 @@ abstract class CarbonRDD[T: ClassTag]( def internalCompute(split: Partition, context: TaskContext): Iterator[T] final def compute(split: Partition, context: TaskContext): Iterator[T] = { - TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll()) + SparkVersionAdapter.addTaskCompletionListener(ThreadLocalSessionInfo.unsetAll()) Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |