vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645298630 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/util/CreateTableCommonUtil.scala ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.log4j.Logger +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType + +object CreateTableCommonUtil { + + def getNewTable(sparkSession: SparkSession, sessionState: SessionState, Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645300638 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ########## @@ -360,9 +359,14 @@ object CarbonStore { private def validateTimeFormat(timestamp: String): Long = { try { - DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get + CarbonToSparkAdapter.stringToTimestamp(timestamp) match { + case Some(value) => value + case _ => + val errorMessage = "Error: Invalid load start time format: " + timestamp Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645301549 ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CarbonCodegenSupport.scala ########## @@ -14,15 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.spark.sql.execution -package org.apache.carbondata.spark.adapter +import org.apache.spark.sql.execution.joins.HashJoin -import scala.collection.mutable.ArrayBuffer +trait CarbonCodegenSupport extends SparkPlan with HashJoin { Review comment: removed hashjoin from BroadCastSIfilterPushjoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645302121 ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable + +object CreateDataSourceTableCommand { Review comment: added to common code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645302372 ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala ########## @@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu override def qualifier: Seq[String] = null override def newInstance(): NamedExpression = throw new UnsupportedOperationException -} \ No newline at end of file +} Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645302566 ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.log4j.Logger +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable + +import org.apache.carbondata.common.logging.LogServiceFactory + +case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) + extends RunnableCommand { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + val sessionState = sparkSession.sessionState + if (sessionState.catalog.tableExists(table.identifier)) { + if (ignoreIfExists) { + return Seq.empty[Row] + } else { + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + } + } + val newTable: CatalogTable = getNewTable(sparkSession, sessionState, table, LOGGER) + + // We will return Nil or throw exception at the beginning if the table already exists, so when + // we reach here, the table should not exist and we should set `ignoreIfExists` to false. + sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + Seq.empty[Row] + } +} + +object CreateDataSourceTableCommand { Review comment: It is common to 3.1, combined it with 3.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r644521888 ########## File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ########## @@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue, } } + private static long createTimeInstant(String dimensionValue, String dateFormat) { + String updatedDim = dimensionValue; + if (!dimensionValue.trim().contains(" ")) { + updatedDim += " 00:00:00"; + } + Instant instant = Instant.from(ZonedDateTime + .of(LocalDateTime.parse(updatedDim, DateTimeFormatter.ofPattern(dateFormat)), + ZoneId.systemDefault())); + validateTimeStampRange(instant.getEpochSecond()); + long us = Math.multiplyExact(instant.getEpochSecond(), 1000L); + return Math.addExact(us, instant.getNano() * 1000L); + } + private static Object parseTimestamp(String dimensionValue, String dateFormat) { Date dateToStr; DateFormat dateFormatter = null; long timeValue; + if (Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(CarbonCommonConstants + .CARBON_SPARK_VERSION_SPARK3))) { + try { + return createTimeInstant(dimensionValue, dateFormat); Review comment: code removed ########## File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ########## @@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue, } } + private static long createTimeInstant(String dimensionValue, String dateFormat) { + String updatedDim = dimensionValue; + if (!dimensionValue.trim().contains(" ")) { + updatedDim += " 00:00:00"; Review comment: code removed ########## File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala ########## @@ -30,7 +30,7 @@ object LuceneIndexExample { def main(args: Array[String]) { val spark = ExampleUtils.createSparkSession("LuceneIndexExample") - exampleBody(spark) + exampleBody(spark) Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2644,5 +2644,4 @@ private CarbonCommonConstants() { public static final String CARBON_MAP_ORDER_PUSHDOWN = "carbon.mapOrderPushDown"; public static final String CARBON_SDK_EMPTY_METADATA_PATH = "emptyMetadataFolder"; - Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -0,0 +1,734 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId + +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} Review comment: corrected ########## File path: examples/spark/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala ########## @@ -151,7 +151,7 @@ object StructuredStreamingExample { // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") - .trigger(ProcessingTime("5 seconds")) + .trigger(CarbonToSparkAdapter.getProcessingTime("5 seconds")) Review comment: added to scala package from spark_version ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ########## @@ -183,7 +183,7 @@ object DataLoadProcessorStepOnSpark { val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger) rowConverter.initialize() - TaskContext.get().addTaskCompletionListener { context => + CarbonToSparkAdapter.addTaskCompletionListener { Review comment: addTaskCompletionListener API change in Spark3.1 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala ########## @@ -78,10 +78,8 @@ case class CarbonCreateDataSourceTableCommand( catalogTable.partitionColumnNames, caseSensitiveAnalysis) val rows = try { - CreateDataSourceTableCommand( - catalogTable, - ignoreIfExists - ).run(sparkSession) + org.apache.spark.sql.execution.CreateDataSourceTableCommand + .createDataSource(catalogTable, ignoreIfExists, sparkSession) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala ########## @@ -501,40 +321,6 @@ object CarbonSparkSqlParserUtil { } } - /** - * Validates the partition columns and return's A tuple of partition columns and partitioner - * fields. - * - * @param partitionColumns An instance of ColTypeListContext having parser rules for - * column. - * @param colNames <Seq[String]> Sequence of Table column names. - * @param tableProperties <Map[String, String]> Table property map. - * @param partitionByStructFields Seq[StructField] Sequence of partition fields. - * @return <Seq[PartitionerField]> A Seq of partitioner fields. - */ - def validatePartitionFields( Review comment: added to common package ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala ########## @@ -160,9 +160,7 @@ class IndexLoaderRDD( val reader = indexFormat.createRecordReader(inputSplit, attemptContext) val iter = new Iterator[(TableBlockIndexUniqueIdentifier, BlockletIndexDetailsWithSchema)] { // in case of success, failure or cancellation clear memory and stop execution - context.addTaskCompletionListener { _ => - reader.close() - } + CarbonToSparkAdapter.addTaskCompletionListener(reader.close()) Review comment: addTaskCompletionListener API changed in Spark 3.1 ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.net.URI +import java.sql.{Date, Timestamp} +import java.time.ZoneId + +import javax.xml.bind.DatatypeConverter + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule +import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer} +import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext} +import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan, logical} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, SQLExecution, ShuffledRowRDD, SimpleMode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand} +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty} +import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType} +import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.carbondata.common.exceptions.DeprecatedFeatureException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.SchemaReader +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF} +import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select} +import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.util.CarbonScalaUtil + +object CarbonToSparkAdapter { + + def addSparkSessionListener(sparkSession: SparkSession): Unit = { + sparkSession.sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + CarbonEnv.carbonEnvMap.remove(sparkSession) + ThreadLocalSessionInfo.unsetAll() + } + }) + } + + def addSparkListener(sparkContext: SparkContext): Unit = { + sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + SparkSession.setDefaultSession(null) + } + }) + } + + def createAttributeReference( + name: String, + dataType: DataType, + nullable: Boolean, + metadata: Metadata, + exprId: ExprId, + qualifier: Option[String], + attrRef : NamedExpression = null): AttributeReference = { + val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty + AttributeReference( + name, + dataType, + nullable, + metadata)(exprId, qf) + } + + def createAttributeReference( + name: String, + dataType: DataType, + nullable: Boolean, + metadata: Metadata, + exprId: ExprId, + qualifier: Seq[String]): AttributeReference = { + AttributeReference( + name, + dataType, + nullable, + metadata)(exprId, qualifier) + } + + def lowerCaseAttribute(expression: Expression): Expression = expression.transform { + case attr: AttributeReference => + CarbonToSparkAdapter.createAttributeReference( + attr.name.toLowerCase, + attr.dataType, + attr.nullable, + attr.metadata, + attr.exprId, + attr.qualifier) + } + + def createAttributeReference(attr: AttributeReference, + attrName: String, + newSubsume: String): AttributeReference = { + AttributeReference(attrName, attr.dataType)( + exprId = attr.exprId, + qualifier = newSubsume.split("\n").map(_.trim)) + } + + def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = { + s.copy(children = Seq(reference)) + } + + def createExprCode(code: String, isNull: String, value: String, dataType: DataType): ExprCode = { + ExprCode( + code"$code", + JavaCode.isNullVariable(isNull), + JavaCode.variable(value, dataType)) + } + + def createAliasRef( + child: Expression, + name: String, + exprId: ExprId = NamedExpression.newExprId, + qualifier: Seq[String] = Seq.empty, + explicitMetadata: Option[Metadata] = None, + namedExpr: Option[NamedExpression] = None) : Alias = { Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/avro/AvroFileFormatFactory.scala ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro Review comment: done ########## File path: mv/plan/src/main/spark2.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala ########## @@ -47,4 +55,110 @@ object ExpressionHelper { reference.qualifier.head } + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes: AttributeMap[ColumnStat] = stats.attributeStats + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints) + } + + def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = { + val Subquery(newPlan) = BirdcageOptimizer.execute(Subquery(s.plan)) + newPlan + } + + def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = { + QueryPlan.normalizeExprId(r, attrs) + } + + def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Expression] = { + rAliasMap.asInstanceOf[AttributeMap[Expression]] + } + + def seqOfRules : Seq[Rule[LogicalPlan]] = { + Seq( + // Operator push down + PushProjectionThroughUnion, + ReorderJoin, + EliminateOuterJoin, + PushPredicateThroughJoin, + PushDownPredicate, + ColumnPruning, + // Operator combine + CollapseRepartition, + CollapseProject, + CollapseWindow, + CombineFilters, + CombineLimits, + CombineUnions, + // Constant folding and strength reduction + NullPropagation, + FoldablePropagation, + ConstantFolding, + ReorderAssociativeOperator, + // No need to apply LikeSimplification rule while creating MV + // as modular plan asCompactSql will be set in schema + // LikeSimplification, + BooleanSimplification, + SimplifyConditionals, + RemoveDispensableExpressions, + SimplifyBinaryComparison, + EliminateSorts, + SimplifyCasts, + SimplifyCaseConversionExpressions, + RewriteCorrelatedScalarSubquery, + EliminateSerialization, + RemoveRedundantAliases, + RemoveRedundantProject) + } +} + +trait getVerboseString extends LeafNode { +} + +trait groupByUnaryNode extends UnaryNode { +} + +trait selectModularPlan extends ModularPlan { +} + +trait unionModularPlan extends ModularPlan { +} + +trait oneRowTableLeafNode extends LeafNode { +} + +object MatchJoin { + def unapply(plan : LogicalPlan): Option[(LogicalPlan, LogicalPlan, JoinType, Option[Expression], + Option[Any])] = { + plan match { + case j@Join(left, right, joinType, condition) => + val a = Some(left, right, joinType, condition, None) + a Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala ########## @@ -147,13 +152,15 @@ object BroadCastSIFilterPushJoin { inputCopy: Array[InternalRow], leftKeys: Seq[Expression], rightKeys: Seq[Expression], - buildSide: BuildSide, + buildSide: CarbonBuildSideType, isIndexTable: Boolean = false): Unit = { + val carbonBuildSide = CarbonBuildSide(buildSide) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala ########## @@ -214,9 +214,7 @@ class CarbonSIRebuildRDD[K, V]( new SparkDataTypeConverterImpl) // add task completion listener to clean up the resources - context.addTaskCompletionListener { _ => - close() - } + CarbonToSparkAdapter.addTaskCompletionListener(close()) Review comment: addTaskCompletionListener API changed in Spark 3.1 ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -185,8 +224,390 @@ object CarbonToSparkAdapter { def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = { sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog] } + + def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = { + FilePartition(index, files.toArray.toSeq) + } + + def stringToTimestamp(timestamp: String): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)) + } + + def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = { + Some(u.tableIdentifier) + } + + def dateToString(date: Int): String = { + DateTimeUtils.dateToString(date.toString.toInt) + } + + def timeStampToString(timeStamp: Long): String = { + DateTimeUtils.timestampToString(timeStamp) + } Review comment: done, added to common2.3and2.4 ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -185,8 +224,390 @@ object CarbonToSparkAdapter { def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = { sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog] } + + def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = { + FilePartition(index, files.toArray.toSeq) + } + + def stringToTimestamp(timestamp: String): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)) + } + + def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = { + Some(u.tableIdentifier) + } + + def dateToString(date: Int): String = { + DateTimeUtils.dateToString(date.toString.toInt) + } + + def timeStampToString(timeStamp: Long): String = { + DateTimeUtils.timestampToString(timeStamp) + } + + def stringToTime(value: String): java.util.Date = { + DateTimeUtils.stringToTime(value) + } + + def getProcessingTime: String => Trigger = { + Trigger.ProcessingTime + } + + def addTaskCompletionListener[U](f: => U) { + TaskContext.get().addTaskCompletionListener { context => + f + } + } + + def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow], + child: SparkPlan, serializer: Serializer): ShuffledRowRDD = { + new ShuffledRowRDD( + ShuffleExchangeExec.prepareShuffleDependency( + localTopK, child.output, SinglePartition, serializer)) + } + + def getInsertIntoCommand(table: LogicalPlan, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean): InsertIntoTable = { + InsertIntoTable( + table, + partition, + query, + overwrite, + ifPartitionNotExists) + } + + def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(), + mode: Option[String]) : ExplainCommand = { + ExplainCommand(logicalPlan, mode.isDefined) + } + + def getExplainCommandObj(mode: Option[String]) : ExplainCommand = { + ExplainCommand(OneRowRelation(), mode.isDefined) + } + + def invokeAnalyzerExecute(analyzer: Analyzer, + plan: LogicalPlan): LogicalPlan = { + analyzer.executeAndCheck(plan) + } + + def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = { + QueryPlan.normalizeExprId(r, attrs) + } + + def getBuildRight: BuildSide = { + BuildRight + } + + def getBuildLeft: BuildSide = { + BuildLeft + } + + type CarbonBuildSideType = BuildSide + type InsertIntoStatementWrapper = InsertIntoTable + + def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = { + SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_) + } + + def getTableIdentifier(parts: TableIdentifier): TableIdentifier = { + parts + } + + def createJoinNode(child: LogicalPlan, + targetTable: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]): Join = { + Join(child, targetTable, joinType, condition) + } + + def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = { + x.partition + } + + def getStatisticsObj(outputList: Seq[NamedExpression], Review comment: yes removed ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala ########## @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution.command -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession} Review comment: removed ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala ########## @@ -229,8 +270,388 @@ object CarbonToSparkAdapter { .unwrapped .asInstanceOf[HiveExternalCatalog] } + + def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]): FilePartition = { + FilePartition(index, files.toArray) + } + + def stringToTimestamp(timestamp: String): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)) + } + + def stringToTime(value: String): java.util.Date = { + DateTimeUtils.stringToTime(value) + } + + def timeStampToString(timeStamp: Long): String = { + DateTimeUtils.timestampToString(timeStamp) + } + + def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = { + Some(u.tableIdentifier) + } + + def dateToString(date: Int): String = { + DateTimeUtils.dateToString(date.toString.toInt) + } + + def getProcessingTime: String => Trigger = { + Trigger.ProcessingTime + } + + def addTaskCompletionListener[U](f: => U) { + TaskContext.get().addTaskCompletionListener { context => + f + } + } + + def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow], + child: SparkPlan, serializer: Serializer): ShuffledRowRDD = { + new ShuffledRowRDD( + ShuffleExchangeExec.prepareShuffleDependency( + localTopK, child.output, SinglePartition, serializer)) + } + + def getInsertIntoCommand(table: LogicalPlan, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean): InsertIntoTable = { + InsertIntoTable( + table, + partition, + query, + overwrite, + ifPartitionNotExists) + } + + def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(), + mode: Option[String]) : ExplainCommand = { + ExplainCommand(logicalPlan, mode.isDefined) + } + + def invokeAnalyzerExecute(analyzer: Analyzer, + plan: LogicalPlan): LogicalPlan = { + analyzer.executeAndCheck(plan) + } + + def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = { + QueryPlan.normalizeExprId(r, attrs) + } + + def getBuildRight: BuildSide = { + BuildRight + } + + def getBuildLeft: BuildSide = { + BuildLeft + } + + type CarbonBuildSideType = BuildSide + type InsertIntoStatementWrapper = InsertIntoTable + + def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = { + SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_) + } + + def createJoinNode(child: LogicalPlan, + targetTable: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]): Join = { + Join(child, targetTable, joinType, condition) + } + + def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = { + x.partition + } + + def getTableIdentifier(parts: TableIdentifier): TableIdentifier = { + parts + } + + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes: AttributeMap[ColumnStat] = stats.attributeStats + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints) + } + + def createRefreshTableCommand(tableIdentifier: TableIdentifier): RefreshTable = { + RefreshTable(tableIdentifier) + } + + type RefreshTables = RefreshTable Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CarbonCodegenSupport.scala ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.execution.joins.HashJoin + +trait CarbonCodegenSupport extends SparkPlan with HashJoin { Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.strategy + +import scala.collection.JavaConverters._ + +import org.apache.spark.CarbonInputMetrics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.types.AtomicType + +import org.apache.carbondata.core.index.IndexFilter +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.BucketingInfo +import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.spark.rdd.CarbonScanRDD + +/** + * Physical plan node for scanning data. It is applied for both tables + * USING carbondata and STORED AS carbondata. + */ +case class CarbonDataSourceScan( Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonAnalyzer.scala ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.CarbonReflectionUtils + +class CarbonAnalyzer(catalog: SessionCatalog, Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.parser.ParserUtils.string +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.internal.SQLConf Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.CarbonToSparkAdapter Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession} Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonBoundReference.scala ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.DataType + +import org.apache.carbondata.core.scan.expression.ColumnExpression + +case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean) Review comment: common in 2.3 and 2.4, added to common code ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkSqlAdapter.scala ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression} +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.types.StructType + +object SparkSqlAdapter { + + def initSparkSQL(): Unit = { + } + + def getScanForSegments( + @transient relation: HadoopFsRelation, Review comment: FileSourceScanExec API is different in spark2.3, 2.4 and 3.1. Different number of arguments in all 3. ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/SparkSqlAstBuilderWrapper.scala ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.internal.SQLConf + +/** + * use this wrapper to adapter multiple spark versions + */ +abstract class SparkSqlAstBuilderWrapper(conf: SQLConf) extends SparkSqlAstBuilder { Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * parser order: carbon parser => spark parser + */ +class CarbonExtensionSqlParser( Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ########## @@ -321,7 +321,7 @@ object DeleteExecution { deleteStatus = SegmentStatus.SUCCESS } catch { case e : MultipleMatchingException => - LOGGER.error(e.getMessage) + LOGGER.error(e.getMessage) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/util/CreateTableCommonUtil.scala ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.log4j.Logger +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType + +object CreateTableCommonUtil { + + def getNewTable(sparkSession: SparkSession, sessionState: SessionState, Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ########## @@ -360,9 +359,14 @@ object CarbonStore { private def validateTimeFormat(timestamp: String): Long = { try { - DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get + CarbonToSparkAdapter.stringToTimestamp(timestamp) match { + case Some(value) => value + case _ => + val errorMessage = "Error: Invalid load start time format: " + timestamp Review comment: done ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CarbonCodegenSupport.scala ########## @@ -14,15 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.spark.sql.execution -package org.apache.carbondata.spark.adapter +import org.apache.spark.sql.execution.joins.HashJoin -import scala.collection.mutable.ArrayBuffer +trait CarbonCodegenSupport extends SparkPlan with HashJoin { Review comment: removed hashjoin from BroadCastSIfilterPushjoin ########## File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable + +object CreateDataSourceTableCommand { Review comment: added to common code ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala ########## @@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu override def qualifier: Seq[String] = null override def newInstance(): NamedExpression = throw new UnsupportedOperationException -} \ No newline at end of file +} Review comment: done ########## File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.log4j.Logger +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable + +import org.apache.carbondata.common.logging.LogServiceFactory + +case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) + extends RunnableCommand { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + val sessionState = sparkSession.sessionState + if (sessionState.catalog.tableExists(table.identifier)) { + if (ignoreIfExists) { + return Seq.empty[Row] + } else { + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + } + } + val newTable: CatalogTable = getNewTable(sparkSession, sessionState, table, LOGGER) + + // We will return Nil or throw exception at the beginning if the table already exists, so when + // we reach here, the table should not exist and we should set `ignoreIfExists` to false. + sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + Seq.empty[Row] + } +} + +object CreateDataSourceTableCommand { Review comment: It is common to 3.1, combined it with 3.1 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala ########## @@ -240,6 +235,21 @@ object DMLStrategy extends SparkStrategy { condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinRangeListUDF] } + object CarbonExtractEquiJoinKeys { + def unapply(plan: LogicalPlan): Option[(JoinType, Seq[Expression], Seq[Expression], + Option[Expression], LogicalPlan, LogicalPlan)] = { + plan match { + case join: Join => + ExtractEquiJoinKeys.unapply(join) match { + // ignoring hints as carbon is not using them right now Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util + +import org.apache.spark.sql.CarbonToSparkAdapter + +import scala.collection.JavaConverters._ +import org.apache.spark.sql.catalyst.CarbonParserUtil +import org.apache.spark.sql.catalyst.parser.SqlBaseParser +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateTableContext, HiveChangeColumnContext} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, QualifiedColType} +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand} +import org.apache.spark.sql.execution.command.table.CarbonExplainCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.types.{DecimalType, StructField} + +trait SqlAstBuilderHelper extends SparkSqlAstBuilder { + + Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ########## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.strategy + +import scala.collection.JavaConverters._ + +import org.apache.spark.CarbonInputMetrics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonToSparkAdapter} +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.{DataSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.carbondata.core.index.IndexFilter +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.BucketingInfo +import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.spark.rdd.CarbonScanRDD + +/** + * Physical plan node for scanning data. It is applied for both tables + * USING carbondata and STORED AS carbondata. + */ +case class CarbonDataSourceScan( + @transient relation: CarbonDatasourceHadoopRelation, + output: Seq[Attribute], + partitionFilters: Seq[SparkExpression], + dataFilters: Seq[SparkExpression], + @transient readComittedScope: ReadCommittedScope, + @transient pushedDownProjection: CarbonProjection, + @transient pushedDownFilters: Seq[Expression], + directScanSupport: Boolean, + @transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None, + tableIdentifier: Option[TableIdentifier] = None, + segmentIds: Option[String] = None) + extends DataSourceScanExec { + + override lazy val supportsColumnar: Boolean = CarbonPlanHelper + .supportBatchedDataSource(sqlContext, output, extraRDD) + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => + new Iterator[ColumnarBatch] { + + override def hasNext: Boolean = { + val res = batches.hasNext + res + } + + override def next(): ColumnarBatch = { + val batch = batches.next() + numOutputRows += batch.numRows() + batch + } + } + } + } + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + lazy val needsUnsafeRowConversion: Boolean = { true } + + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + val info: BucketingInfo = relation.carbonTable.getBucketingInfo + if (info != null) { + val cols = info.getListOfColumns.asScala + val numBuckets = info.getNumOfRanges + val bucketColumns = cols.flatMap { n => + val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName)) + attrRef match { + case Some(attr: AttributeReference) => + Some(AttributeReference(attr.name, + CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(n.getDataType), + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier)) + case _ => None + } + } + if (bucketColumns.size == cols.size) { + // use HashPartitioning will not shuffle + (HashPartitioning(bucketColumns, numBuckets), Nil) + } else { + (UnknownPartitioning(0), Nil) + } + } else { + (UnknownPartitioning(0), Nil) + } + } + + override lazy val metadata: Map[String, String] = { + def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") + val metadata = + Map( + "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns), + "Batched" -> supportsColumnar.toString, + "DirectScan" -> (supportsColumnar && directScanSupport).toString, + "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement))) + if (relation.carbonTable.isHivePartitionTable) { + metadata + ("PartitionFilters" -> seqToString(partitionFilters)) + + ("PartitionCount" -> selectedPartitions.size.toString) + } else { + metadata + } + } + + @transient private lazy val indexFilter: IndexFilter = { + val filter = pushedDownFilters.reduceOption(new AndExpression(_, _)) + .map(new IndexFilter(relation.carbonTable, _, true)).orNull + if (filter != null && pushedDownFilters.length == 1) { + // push down the limit if only one filter + filter.setLimit(relation.limit) + } + filter + } + + @transient private lazy val selectedPartitions: Seq[PartitionSpec] = { + CarbonFilters + .getPartitions(partitionFilters, relation.sparkSession, relation.carbonTable) + .orNull + } + + private lazy val inputRDD: RDD[InternalRow] = { + val carbonRdd = new CarbonScanRDD[InternalRow]( + relation.sparkSession, + pushedDownProjection, + indexFilter, + relation.identifier, + relation.carbonTable.getTableInfo.serialize(), + relation.carbonTable.getTableInfo, + new CarbonInputMetrics, + selectedPartitions, + segmentIds = segmentIds) + carbonRdd.setVectorReaderSupport(supportsColumnar) + carbonRdd.setDirectScanSupport(supportsColumnar && directScanSupport) + extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd) + } + Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.concurrent.Callable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, ExternalCatalogWithListener, FunctionResourceLoader, GlobalTempViewManager} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.execution.strategy.{CarbonSourceStrategy, DDLStrategy, DMLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonHiveSessionCatalog( Review comment: different class definition ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.log4j.Logger +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable + +import org.apache.carbondata.common.logging.LogServiceFactory + +case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) + extends RunnableCommand { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + + val sessionState = sparkSession.sessionState + if (sessionState.catalog.tableExists(table.identifier)) { + if (ignoreIfExists) { + return Seq.empty[Row] + } else { + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + } Review comment: done ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CarbonCodegenSupport.scala ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.joins.HashJoin + +trait CarbonCodegenSupport extends SparkPlan with HashJoin { + + // TODO: Spark has started supporting Codegen for Join, Carbon needs to implement the same. + override def supportCodegen: Boolean = false Review comment: okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645381006 ########## File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CarbonCodegenSupport.scala ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.joins.HashJoin + +trait CarbonCodegenSupport extends SparkPlan with HashJoin { + + // TODO: Spark has started supporting Codegen for Join, Carbon needs to implement the same. + override def supportCodegen: Boolean = false Review comment: okay https://issues.apache.org/jira/browse/CARBONDATA-4197 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala ########## @@ -704,5 +705,13 @@ private[sql] case class CarbonCreateSecondaryIndexCommand( columnSchema } + override def clone(): LogicalPlan = { + CarbonCreateSecondaryIndexCommand(indexModel, Review comment: this method is called from spark side ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ########## @@ -604,11 +605,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] = - ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~ - ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ { - case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values => + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~ ident ~ Review comment: it already exists ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ########## @@ -67,6 +68,8 @@ object DDLStrategy extends SparkStrategy { case changeColumn: AlterTableChangeColumnCommand if isCarbonTable(changeColumn.tableName) => ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil + case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand => Review comment: Previously spark was parsing the command, now spark has removed the support. We could either alter the command definition to match with newly updated spark or could parse on our own. Since, the method was already in carbondata code, we decided to parse it in carbondata only. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ########## @@ -67,6 +68,8 @@ object DDLStrategy extends SparkStrategy { case changeColumn: AlterTableChangeColumnCommand if isCarbonTable(changeColumn.tableName) => ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil + case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand => Review comment: Previously spark was parsing the alter column command, now spark has removed the support. We could either alter the command definition to match with newly updated spark or could parse on our own. Since, the method was already in carbondata code, we decided to parse it in carbondata only. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ########## @@ -67,6 +68,8 @@ object DDLStrategy extends SparkStrategy { case changeColumn: AlterTableChangeColumnCommand if isCarbonTable(changeColumn.tableName) => ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil + case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand => Review comment: Previously spark was parsing the alter column rename command, now spark has removed the support. We could either alter the command definition to match with newly updated spark or could parse on our own. Since, the method was already in carbondata code, we decided to parse it in carbondata only. ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ########## @@ -181,7 +182,7 @@ object CarbonSparkUtil { */ def createHadoopJob(conf: Configuration = FileFactory.getConfiguration): Job = { val jobConf = new JobConf(conf) - SparkHadoopUtil.get.addCredentials(jobConf) + SparkUtil.addCredentials(jobConf) Review comment: spark has made addCredentials method private to spark, can't access it directly from carbondata package -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645396128 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala ########## @@ -71,6 +71,12 @@ object SparkTestQueryExecutor { .config("spark.sql.warehouse.dir", warehouse) .config("spark.sql.crossJoin.enabled", "true") .config("spark.sql.extensions", extensions) + .config("spark.sql.storeAssignmentPolicy", "legacy") + .config("spark.sql.legacy.timeParserPolicy", "legacy") Review comment: done ########## File path: examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ########## @@ -93,6 +93,7 @@ object ExampleUtils { .config("spark.driver.host", "localhost") .config("spark.sql.crossJoin.enabled", "true") .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions") + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#discussion_r645397625 ########## File path: index/examples/pom.xml ########## @@ -81,9 +81,6 @@ <profiles> <profile> <id>spark-2.3</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> Review comment: no new profile is added here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854538244 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5499/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854540268 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3757/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854703925 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3760/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854705223 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5503/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854806997 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3762/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-854810300 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5505/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-855397465 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5509/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-855397487 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3766/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 commented on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-855624385 LGTM @vikramahuja1001 please handle the comments and/or reply to the comments.. lets get this merged ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
kunal642 removed a comment on pull request #4141: URL: https://github.com/apache/carbondata/pull/4141#issuecomment-855624385 LGTM @vikramahuja1001 please handle the comments and/or reply to the comments.. lets get this merged ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |