[GitHub] [carbondata] vikramahuja1001 opened a new pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

classic Classic list List threaded Threaded
123 messages Options
1234 ... 7
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 opened a new pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox

vikramahuja1001 opened a new pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141


    ### Why is this PR needed?
    To integrate Carbondata with Spark3.1.1
   
    ### What changes were proposed in this PR?
   Refactored code to add changes to support Spark 3.1.1 along with Spark 2.3 and 2.4 versions
       
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - No
   
   
       
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox

akashrn5 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848059100


   @vikramahuja1001 please update the description with more information of what and all changes included in this PR


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848135381


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5432/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848154207


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3688/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848158231


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5433/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848231109


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3689/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848232531


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5434/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848689465


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5440/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-848691115


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3695/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#discussion_r639004523



##########
File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
##########
@@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue,
     }
   }
 
+  private static long createTimeInstant(String dimensionValue, String dateFormat) {
+    String updatedDim = dimensionValue;
+    if (!dimensionValue.trim().contains(" ")) {
+      updatedDim += " 00:00:00";
+    }
+    Instant instant = Instant.from(ZonedDateTime
+        .of(LocalDateTime.parse(updatedDim, DateTimeFormatter.ofPattern(dateFormat)),
+        ZoneId.systemDefault()));
+    validateTimeStampRange(instant.getEpochSecond());
+    long us = Math.multiplyExact(instant.getEpochSecond(), 1000L);
+    return Math.addExact(us, instant.getNano() * 1000L);
+  }
+
   private static Object parseTimestamp(String dimensionValue, String dateFormat) {
     Date dateToStr;
     DateFormat dateFormatter = null;
     long timeValue;
+    if (Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_SPARK_VERSION_SPARK3))) {
+      try {
+        return createTimeInstant(dimensionValue, dateFormat);

Review comment:
       please add a comment here why the change in spark3 for any future reference

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2644,5 +2644,4 @@ private CarbonCommonConstants() {
   public static final String CARBON_MAP_ORDER_PUSHDOWN = "carbon.mapOrderPushDown";
 
   public static final String CARBON_SDK_EMPTY_METADATA_PATH = "emptyMetadataFolder";
-

Review comment:
       revert this change

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -61,37 +61,37 @@ object LuceneIndexExample {
       """.stripMargin)
 
     // 1. Compare the performance:
-
-    def time(code: => Unit): Double = {
-      val start = System.currentTimeMillis()
-      code
-      // return time in second
-      (System.currentTimeMillis() - start).toDouble / 1000
-    }
-
-    val timeWithoutLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where id like '% test1 %'
-      """.stripMargin).show()
-
-    }
-
-    val timeWithLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where TEXT_MATCH('id:test1')
-      """.stripMargin).show()
-
-    }
+    // TODO: Revert this after SPARK 3.1 fix

Review comment:
       can you please mention the spark JIRA id here, so that reviewer can know the exact problem

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -30,7 +30,7 @@ object LuceneIndexExample {
 
   def main(args: Array[String]) {
     val spark = ExampleUtils.createSparkSession("LuceneIndexExample")
-    exampleBody(spark)
+      exampleBody(spark)

Review comment:
       revert this

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
##########
@@ -151,7 +151,7 @@ object StructuredStreamingExample {
           // Write data from socket stream to carbondata file
           qry = readSocketDF.writeStream
             .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
+            .trigger(CarbonToSparkAdapter.getProcessingTime("5 seconds"))

Review comment:
       `getProcessingTime` calls the same API both in 2.4 and 3.1 and package also same for `Trigger` class, then why we need to put in different package, can you please explain?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
##########
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}

Review comment:
       can we keep this class in the same package structure as old?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -360,9 +359,14 @@ object CarbonStore {
 
   private def validateTimeFormat(timestamp: String): Long = {
     try {
-      DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp)).get
+      CarbonToSparkAdapter.stringToTimestamp(timestamp) match {
+        case Some(value) => value
+        case _ =>
+          val errorMessage = "Error: Invalid load start time format: " + timestamp

Review comment:
       no need to define the same error message here, as its already handled in catch block, can just throw runtime exception

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
##########
@@ -67,6 +68,8 @@ object DDLStrategy extends SparkStrategy {
       case changeColumn: AlterTableChangeColumnCommand
         if isCarbonTable(changeColumn.tableName) =>
         ExecutedCommandExec(DDLHelper.changeColumn(changeColumn, sparkSession)) :: Nil
+      case changeColumn: CarbonAlterTableColRenameDataTypeChangeCommand =>

Review comment:
       command will always come as change column command. In which case this required as it wasn't there before

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
##########
@@ -160,9 +160,7 @@ class IndexLoaderRDD(
     val reader = indexFormat.createRecordReader(inputSplit, attemptContext)
     val iter = new Iterator[(TableBlockIndexUniqueIdentifier, BlockletIndexDetailsWithSchema)] {
       // in case of success, failure or cancellation clear memory and stop execution
-      context.addTaskCompletionListener { _ =>
-        reader.close()
-      }
+      CarbonToSparkAdapter.addTaskCompletionListener(reader.close())

Review comment:
       here also i can see same code in 3.1 and 2.4, so why this required?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
##########
@@ -439,10 +445,31 @@ public static Object getDataDataTypeForNoDictionaryColumn(String dimensionValue,
     }
   }
 
+  private static long createTimeInstant(String dimensionValue, String dateFormat) {
+    String updatedDim = dimensionValue;
+    if (!dimensionValue.trim().contains(" ")) {
+      updatedDim += " 00:00:00";

Review comment:
       please add a comment here why we are adding the string and then add a TODO to analyze it and change it later once the integration is complete in alpha stage

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -185,8 +224,390 @@ object CarbonToSparkAdapter {
   def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = {
     sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }
+
+  def stringToTime(value: String): java.util.Date = {
+    DateTimeUtils.stringToTime(value)
+  }
+
+  def getProcessingTime: String => Trigger = {
+    Trigger.ProcessingTime
+  }
+
+  def addTaskCompletionListener[U](f: => U) {
+    TaskContext.get().addTaskCompletionListener { context =>
+      f
+    }
+  }
+
+  def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow],
+      child: SparkPlan, serializer: Serializer): ShuffledRowRDD = {
+    new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+  }
+
+  def getInsertIntoCommand(table: LogicalPlan,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean): InsertIntoTable = {
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
+  }
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+      mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, mode.isDefined)
+  }
+
+  def getExplainCommandObj(mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(OneRowRelation(), mode.isDefined)
+  }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+      plan: LogicalPlan): LogicalPlan = {
+    analyzer.executeAndCheck(plan)
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def getBuildRight: BuildSide = {
+    BuildRight
+  }
+
+  def getBuildLeft: BuildSide = {
+    BuildLeft
+  }
+
+  type CarbonBuildSideType = BuildSide
+  type InsertIntoStatementWrapper = InsertIntoTable
+
+  def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = {
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_)
+  }
+
+  def getTableIdentifier(parts: TableIdentifier): TableIdentifier = {
+    parts
+  }
+
+  def createJoinNode(child: LogicalPlan,
+      targetTable: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    Join(child, targetTable, joinType, condition)
+  }
+
+  def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = {
+    x.partition
+  }
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],

Review comment:
       i think this method no one using, can remove if no one uses

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala
##########
@@ -71,6 +71,12 @@ object SparkTestQueryExecutor {
     .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
     .config("spark.sql.extensions", extensions)
+    .config("spark.sql.storeAssignmentPolicy", "legacy")
+    .config("spark.sql.legacy.timeParserPolicy", "legacy")

Review comment:
       add TODO to writs test cases for non legacy

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -229,8 +270,388 @@ object CarbonToSparkAdapter {
       .unwrapped
       .asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]): FilePartition = {
+    FilePartition(index, files.toArray)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def stringToTime(value: String): java.util.Date = {
+    DateTimeUtils.stringToTime(value)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def getProcessingTime: String => Trigger = {
+    Trigger.ProcessingTime
+  }
+
+  def addTaskCompletionListener[U](f: => U) {
+    TaskContext.get().addTaskCompletionListener { context =>
+      f
+    }
+  }
+
+  def createShuffledRowRDD(sparkContext: SparkContext, localTopK: RDD[InternalRow],
+      child: SparkPlan, serializer: Serializer): ShuffledRowRDD = {
+    new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+  }
+
+  def getInsertIntoCommand(table: LogicalPlan,
+      partition: Map[String, Option[String]],
+      query: LogicalPlan,
+      overwrite: Boolean,
+      ifPartitionNotExists: Boolean): InsertIntoTable = {
+    InsertIntoTable(
+      table,
+      partition,
+      query,
+      overwrite,
+      ifPartitionNotExists)
+  }
+
+  def getExplainCommandObj(logicalPlan: LogicalPlan = OneRowRelation(),
+      mode: Option[String]) : ExplainCommand = {
+    ExplainCommand(logicalPlan, mode.isDefined)
+  }
+
+  def invokeAnalyzerExecute(analyzer: Analyzer,
+      plan: LogicalPlan): LogicalPlan = {
+    analyzer.executeAndCheck(plan)
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def getBuildRight: BuildSide = {
+    BuildRight
+  }
+
+  def getBuildLeft: BuildSide = {
+    BuildLeft
+  }
+
+  type CarbonBuildSideType = BuildSide
+  type InsertIntoStatementWrapper = InsertIntoTable
+
+  def withNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution): T => T = {
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(_)
+  }
+
+  def createJoinNode(child: LogicalPlan,
+      targetTable: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    Join(child, targetTable, joinType, condition)
+  }
+
+  def getPartitionsFromInsert(x: InsertIntoStatementWrapper): Map[String, Option[String]] = {
+    x.partition
+  }
+
+  def getTableIdentifier(parts: TableIdentifier): TableIdentifier = {
+    parts
+  }
+
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+      plan: LogicalPlan, stats: Statistics,
+      aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes: AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  def createRefreshTableCommand(tableIdentifier: TableIdentifier): RefreshTable = {
+    RefreshTable(tableIdentifier)
+  }
+
+  type RefreshTables = RefreshTable

Review comment:
       as observed so many common code present for 2.3 and 3.1, so please do the refactor as given in previous comment

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -42,4 +42,4 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu
   override def qualifier: Seq[String] = null
 
   override def newInstance(): NamedExpression = throw new UnsupportedOperationException
-}
\ No newline at end of file
+}

Review comment:
       revert this

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.AtomicType
+
+import org.apache.carbondata.core.index.IndexFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS carbondata.
+ */
+case class CarbonDataSourceScan(

Review comment:
       this class has lot of duplicate code of 2.3, please check and refactor

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonAnalyzer.scala
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.CarbonReflectionUtils
+
+class CarbonAnalyzer(catalog: SessionCatalog,

Review comment:
       this is same for 2.3 and 2.4, please refactor to common package

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession}

Review comment:
       same as 2.3, please move to common

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
##########
@@ -93,6 +93,7 @@ object ExampleUtils {
       .config("spark.driver.host", "localhost")
       .config("spark.sql.crossJoin.enabled", "true")
       .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
+      .config("spark.sql.legacy.timeParserPolicy", "LEGACY")

Review comment:
       please add a comment here to analyze and take care and add tests for other than LEGACY

##########
File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
##########
@@ -111,14 +111,13 @@ class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll {
         s"(SELECT * FROM $tableName" + "_segments)").collect()
       assert(rows.length > 0)
       for (index <- 0 until unloadedStageCount) {
-        val row = rows(index)
         assert(rows(index).getString(0) == null)
         assert(rows(index).getString(1).equals("Unload"))
         assert(rows(index).getString(2) != null)
-        assert(rows(index).getLong(3) == -1)
+        assert(rows(index).getString(3) == "-1")

Review comment:
       why we need to change here? can you please explain the reason behind

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
##########
@@ -183,7 +183,7 @@ object DataLoadProcessorStepOnSpark {
     val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
     rowConverter.initialize()
 
-    TaskContext.get().addTaskCompletionListener { context =>
+    CarbonToSparkAdapter.addTaskCompletionListener {

Review comment:
       here also i don't see any diff between 2.4 and 3.1, why refactored, can you please explain

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
##########
@@ -321,7 +321,7 @@ object DeleteExecution {
           deleteStatus = SegmentStatus.SUCCESS
         } catch {
           case e : MultipleMatchingException =>
-            LOGGER.error(e.getMessage)
+          LOGGER.error(e.getMessage)

Review comment:
       revert this

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan, logical}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, SQLExecution, ShuffledRowRDD, SimpleMode, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object CarbonToSparkAdapter {
+
+  def addSparkSessionListener(sparkSession: SparkSession): Unit = {
+    sparkSession.sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        CarbonEnv.carbonEnvMap.remove(sparkSession)
+        ThreadLocalSessionInfo.unsetAll()
+      }
+    })
+  }
+
+  def addSparkListener(sparkContext: SparkContext): Unit = {
+    sparkContext.addSparkListener(new SparkListener {
+      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+        SparkSession.setDefaultSession(null)
+      }
+    })
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Option[String],
+      attrRef : NamedExpression = null): AttributeReference = {
+    val qf = if (qualifier.nonEmpty) Seq(qualifier.get) else Seq.empty
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qf)
+  }
+
+  def createAttributeReference(
+      name: String,
+      dataType: DataType,
+      nullable: Boolean,
+      metadata: Metadata,
+      exprId: ExprId,
+      qualifier: Seq[String]): AttributeReference = {
+    AttributeReference(
+      name,
+      dataType,
+      nullable,
+      metadata)(exprId, qualifier)
+  }
+
+  def lowerCaseAttribute(expression: Expression): Expression = expression.transform {
+    case attr: AttributeReference =>
+      CarbonToSparkAdapter.createAttributeReference(
+        attr.name.toLowerCase,
+        attr.dataType,
+        attr.nullable,
+        attr.metadata,
+        attr.exprId,
+        attr.qualifier)
+  }
+
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = newSubsume.split("\n").map(_.trim))
+  }
+
+  def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
+    s.copy(children = Seq(reference))
+  }
+
+  def createExprCode(code: String, isNull: String, value: String, dataType: DataType): ExprCode = {
+    ExprCode(
+      code"$code",
+      JavaCode.isNullVariable(isNull),
+      JavaCode.variable(value, dataType))
+  }
+
+  def createAliasRef(
+      child: Expression,
+      name: String,
+      exprId: ExprId = NamedExpression.newExprId,
+      qualifier: Seq[String] = Seq.empty,
+      explicitMetadata: Option[Metadata] = None,
+      namedExpr: Option[NamedExpression] = None) : Alias = {

Review comment:
       looks like `namedExpr` is not used, please check in other files and remove

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
##########
@@ -46,12 +49,25 @@ case class CarbonShowSegmentsAsSelectCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
   }
-  private lazy val df = createDataFrame
+
+  val tempViewName: String = makeTempViewName(carbonTable)
+
+  val df: DataFrame = createDataFrame
 
   override def output: Seq[Attribute] = {
-    df.queryExecution.analyzed.output.map { attr =>
-      AttributeReference(attr.name, attr.dataType, nullable = true)()
+    var attrList: Seq[Attribute] = Seq()

Review comment:
       this Seq should be declared at class level if output called multiple times? @kunal642 @vikramahuja1001

##########
File path: index/examples/pom.xml
##########
@@ -81,9 +81,6 @@
   <profiles>
     <profile>
       <id>spark-2.3</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>

Review comment:
       do we need to add new profile here and make that active by default? @kunal642 @vikramahuja1001

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkSqlAdapter.scala
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object SparkSqlAdapter {
+
+  def initSparkSQL(): Unit = {
+  }
+
+  def getScanForSegments(
+      @transient relation: HadoopFsRelation,

Review comment:
       i dont see any difference between all profiles, please check and remove

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonBoundReference.scala
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, LeafExpression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.scan.expression.ColumnExpression
+
+case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean)

Review comment:
       i can see this class same to all profiles, what is the change here?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
##########
@@ -181,7 +182,7 @@ object CarbonSparkUtil {
    */
   def createHadoopJob(conf: Configuration = FileFactory.getConfiguration): Job = {
     val jobConf = new JobConf(conf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
+    SparkUtil.addCredentials(jobConf)

Review comment:
       no need to add method for just one line, and only its called from here,so can revert this change I feel

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review comment:
       same as 2.3, please move

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
##########
@@ -82,10 +83,10 @@ case class MergeProjection(
       if (output.contains(null)) {
         throw new CarbonMergeDataSetException(s"Not all columns are mapped")
       }
-      (new InterpretedMutableProjection(output ++ Seq(
+      (output ++ Seq(
         ds.queryExecution.analyzed.resolveQuoted(statusCol,
           sparkSession.sessionState.analyzer.resolver).get),
-        ds.queryExecution.analyzed.output), expectOutput)

Review comment:
       why  `expectOutput` is removed here?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
##########
@@ -78,10 +78,8 @@ case class CarbonCreateDataSourceTableCommand(
       catalogTable.partitionColumnNames,
       caseSensitiveAnalysis)
     val rows = try {
-      CreateDataSourceTableCommand(
-        catalogTable,
-        ignoreIfExists
-      ).run(sparkSession)
+      org.apache.spark.sql.execution.CreateDataSourceTableCommand
+        .createDataSource(catalogTable, ignoreIfExists, sparkSession)

Review comment:
       i can see same code of `CreateDataSourceTableCommand` in both 3.1 and 2.4, so better to make a common package and make it clean instead of duplicating, we had followed the same when we did previous upgrades also

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSeq, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, Predicate, ScalaUDF, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, Optimizer}
+import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, ColTypeListContext, CreateTableHeaderContext, LocationSpecContext, PartitionFieldListContext, QueryContext, SkewSpecContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan, logical}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType, Statistics, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, SQLExecution, ShuffledRowRDD, SimpleMode, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.{checkIfDuplicateColumnExists, convertDbNameToLowerCase, validateStreamingProperty}
+import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.types.{AbstractDataType, CharType, DataType, Metadata, StringType, StructField, VarcharType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.spark.CarbonOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object CarbonToSparkAdapter {

Review comment:
       please pull out the common code between 2.4 and 3.1 and move to common package and avoid code duplication

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/LuceneIndexExample.scala
##########
@@ -60,40 +60,6 @@ object LuceneIndexExample {
          | AS 'lucene'
       """.stripMargin)
 
-    // 1. Compare the performance:
-
-    def time(code: => Unit): Double = {
-      val start = System.currentTimeMillis()
-      code
-      // return time in second
-      (System.currentTimeMillis() - start).toDouble / 1000
-    }
-
-    val timeWithoutLuceneIndex = time {
-
-      spark.sql(
-        s"""
-           | SELECT count(*)
-           | FROM personTable where id like '% test1 %'
-      """.stripMargin).show()
-
-    }
-
-    val timeWithLuceneIndex = time {
-
-      spark.sql(

Review comment:
       why this code removed?

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+import java.sql.{Date, Timestamp}
+import java.time.ZoneId
+
+import javax.xml.bind.DatatypeConverter
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import org.antlr.v4.runtime.tree.TerminalNode
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, InternalRow, QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}

Review comment:
       the import order doesn't comply to scalastyle rules, please correct it and I wonder how the CI is passing with style errors, please check the new CI and correct it and make sure to check all the new fils once for style issues, now it will directly give the errors in red color

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -47,16 +45,13 @@ case class BroadCastPolygonFilterPushJoin(
     leftKeys: Seq[Expression],
     rightKeys: Seq[Expression],
     joinType: JoinType,
-    buildSide: BuildSide,
     condition: Option[Expression],
     left: SparkPlan,
     right: SparkPlan
-) extends BinaryExecNode with HashJoin {

Review comment:
       why we removed `HashJoin` here?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
##########
@@ -704,5 +705,13 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     columnSchema
   }
 
+  override def clone(): LogicalPlan = {
+    CarbonCreateSecondaryIndexCommand(indexModel,

Review comment:
       why this is required?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
##########
@@ -501,40 +321,6 @@ object CarbonSparkSqlParserUtil {
     }
   }
 
-  /**
-   * Validates the partition columns and return's A tuple of partition columns and partitioner
-   * fields.
-   *
-   * @param partitionColumns        An instance of ColTypeListContext having parser rules for
-   *                                column.
-   * @param colNames                <Seq[String]> Sequence of Table column names.
-   * @param tableProperties         <Map[String, String]> Table property map.
-   * @param partitionByStructFields Seq[StructField] Sequence of partition fields.
-   * @return <Seq[PartitionerField]> A Seq of partitioner fields.
-   */
-  def validatePartitionFields(

Review comment:
       this method and above create table looks same in both 3.1 and 2.4, is small diff, we will do proper refactoring, if common to 2.4 and 3.1, as give comment in one of the previous comments, we will add a common package and do a clean refactor

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
##########
@@ -604,11 +605,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
 
   protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] =
-    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
-    ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
-      case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (CHANGE ~> ident) ~ ident ~ ident ~

Review comment:
       please update the ddl in document and add an example with comment also and add test cases with comment. Now we change comment from properties

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
##########
@@ -185,8 +224,390 @@ object CarbonToSparkAdapter {
   def getHiveExternalCatalog(sparkSession: SparkSession): HiveExternalCatalog = {
     sparkSession.sessionState.catalog.externalCatalog.asInstanceOf[HiveExternalCatalog]
   }
+
+  def createFilePartition(index: Int, files: ArrayBuffer[PartitionedFile]) = {
+    FilePartition(index, files.toArray.toSeq)
+  }
+
+  def stringToTimestamp(timestamp: String): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp))
+  }
+
+  def getTableIdentifier(u: UnresolvedRelation): Some[TableIdentifier] = {
+    Some(u.tableIdentifier)
+  }
+
+  def dateToString(date: Int): String = {
+    DateTimeUtils.dateToString(date.toString.toInt)
+  }
+
+  def timeStampToString(timeStamp: Long): String = {
+    DateTimeUtils.timestampToString(timeStamp)
+  }

Review comment:
       in these, many methods i can see duplicate to 2.4,please refactor as gave comment before

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/util/CreateTableCommonUtil.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+
+object CreateTableCommonUtil {
+
+  def getNewTable(sparkSession: SparkSession, sessionState: SessionState,

Review comment:
       add comment for method and rename to getCatalogtable

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/execution/command/CarbonResetCommand.scala
##########
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, Row, SparkSession}

Review comment:
       revert if not required

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -14,15 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql.execution
 
-package org.apache.carbondata.spark.adapter
+import org.apache.spark.sql.execution.joins.HashJoin
 
-import scala.collection.mutable.ArrayBuffer
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {

Review comment:
       add comment for trait, why its required. Since here already extends Hashjoin, I think implementation no need to add with hashjoin, please check once

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+    val sessionState = sparkSession.sessionState
+    if (sessionState.catalog.tableExists(table.identifier)) {
+      if (ignoreIfExists) {
+        return Seq.empty[Row]
+      } else {
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+      }
+    }
+    val newTable: CatalogTable = getNewTable(sparkSession, sessionState, table, LOGGER)
+
+    // We will return Nil or throw exception at the beginning if the table already exists, so when
+    // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
+    sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
+    Seq.empty[Row]
+  }
+}
+
+object CreateDataSourceTableCommand {

Review comment:
       i think this can be combined with 2.3

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
##########
@@ -214,9 +214,7 @@ class CarbonSIRebuildRDD[K, V](
           new SparkDataTypeConverterImpl)
 
         // add task completion listener to clean up the resources
-        context.addTaskCompletionListener { _ =>
-          close()
-        }
+        CarbonToSparkAdapter.addTaskCompletionListener(close())

Review comment:
       same as old comment

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.execution.joins.HashJoin
+
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {

Review comment:
       Same comment as this class in 2.3, also please make common package and keep there as no change in 2.3 and 2.4

##########
File path: integration/spark/src/main/spark2.3/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+
+object CreateDataSourceTableCommand {

Review comment:
       remove this from here and add to `CarbonToSparkAdapter` similar to other profiles

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateHiveTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf

Review comment:
       this is same for 2.3 and 2.4, please move to common package

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonToSparkAdapter

Review comment:
       this is same as 2.3, please refactor and move to common

##########
File path: integration/spark/src/main/spark2.4/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+

Review comment:
       same as 2.3, please move to common

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro

Review comment:
       this is same code as 2.4, please move to common package

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CarbonCodegenSupport.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.execution.joins.HashJoin
+
+trait CarbonCodegenSupport extends SparkPlan with HashJoin {
+
+  // TODO: Spark has started supporting Codegen for Join, Carbon needs to implement the same.
+  override def supportCodegen: Boolean = false

Review comment:
       can create a subjira under this and track it

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/CreateDataSourceTableCommand.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.util.CreateTableCommonUtil.getNewTable
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    assert(table.tableType != CatalogTableType.VIEW)
+    assert(table.provider.isDefined)
+
+    val sessionState = sparkSession.sessionState
+    if (sessionState.catalog.tableExists(table.identifier)) {
+      if (ignoreIfExists) {
+        return Seq.empty[Row]
+      } else {
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+      }

Review comment:
       i can see same code for 2.4 and 3.1 please refactor and add extra line at end of file, its a style issue, please check why new CI is not finding it




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#discussion_r639690258



##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonToSparkAdapter}
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.{DataSourceScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.carbondata.core.index.IndexFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS carbondata.
+ */
+case class CarbonDataSourceScan(
+    @transient relation: CarbonDatasourceHadoopRelation,
+    output: Seq[Attribute],
+    partitionFilters: Seq[SparkExpression],
+    dataFilters: Seq[SparkExpression],
+    @transient readComittedScope: ReadCommittedScope,
+    @transient pushedDownProjection: CarbonProjection,
+    @transient pushedDownFilters: Seq[Expression],
+    directScanSupport: Boolean,
+    @transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
+    tableIdentifier: Option[TableIdentifier] = None,
+    segmentIds: Option[String] = None)
+  extends DataSourceScanExec {
+
+  override lazy val supportsColumnar: Boolean = CarbonPlanHelper
+    .supportBatchedDataSource(sqlContext, output, extraRDD)
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
+      new Iterator[ColumnarBatch] {
+
+        override def hasNext: Boolean = {
+          val res = batches.hasNext
+          res
+        }
+
+        override def next(): ColumnarBatch = {
+          val batch = batches.next()
+          numOutputRows += batch.numRows()
+          batch
+        }
+      }
+    }
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  lazy val needsUnsafeRowConversion: Boolean = { true }
+
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
+    val info: BucketingInfo = relation.carbonTable.getBucketingInfo
+    if (info != null) {
+      val cols = info.getListOfColumns.asScala
+      val numBuckets = info.getNumOfRanges
+      val bucketColumns = cols.flatMap { n =>
+        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+        attrRef match {
+          case Some(attr: AttributeReference) =>
+            Some(AttributeReference(attr.name,
+              CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(n.getDataType),
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier))
+          case _ => None
+        }
+      }
+      if (bucketColumns.size == cols.size) {
+        // use HashPartitioning will not shuffle
+        (HashPartitioning(bucketColumns, numBuckets), Nil)
+      } else {
+        (UnknownPartitioning(0), Nil)
+      }
+    } else {
+      (UnknownPartitioning(0), Nil)
+    }
+  }
+
+  override lazy val metadata: Map[String, String] = {
+    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+    val metadata =
+      Map(
+        "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
+        "Batched" -> supportsColumnar.toString,
+        "DirectScan" -> (supportsColumnar && directScanSupport).toString,
+        "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
+    if (relation.carbonTable.isHivePartitionTable) {
+      metadata + ("PartitionFilters" -> seqToString(partitionFilters)) +
+        ("PartitionCount" -> selectedPartitions.size.toString)
+    } else {
+      metadata
+    }
+  }
+
+  @transient private lazy val indexFilter: IndexFilter = {
+    val filter = pushedDownFilters.reduceOption(new AndExpression(_, _))
+      .map(new IndexFilter(relation.carbonTable, _, true)).orNull
+    if (filter != null && pushedDownFilters.length == 1) {
+      // push down the limit if only one filter
+      filter.setLimit(relation.limit)
+    }
+    filter
+  }
+
+  @transient private lazy val selectedPartitions: Seq[PartitionSpec] = {
+    CarbonFilters
+      .getPartitions(partitionFilters, relation.sparkSession, relation.carbonTable)
+      .orNull
+  }
+
+  private lazy val inputRDD: RDD[InternalRow] = {
+    val carbonRdd = new CarbonScanRDD[InternalRow](
+      relation.sparkSession,
+      pushedDownProjection,
+      indexFilter,
+      relation.identifier,
+      relation.carbonTable.getTableInfo.serialize(),
+      relation.carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      selectedPartitions,
+      segmentIds = segmentIds)
+    carbonRdd.setVectorReaderSupport(supportsColumnar)
+    carbonRdd.setDirectScanSupport(supportsColumnar && directScanSupport)
+    extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd)
+  }
+

Review comment:
       in this class also, common code we will move to common package, columnBatch we can keep for 3.1

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.CarbonException
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * parser order: carbon parser => spark parser
+ */
+class CarbonExtensionSqlParser(

Review comment:
       can combine 2.3 and 2.4

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.Callable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, ExternalCatalogWithListener, FunctionResourceLoader, GlobalTempViewManager}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.execution.strategy.{CarbonSourceStrategy, DDLStrategy, DMLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(

Review comment:
       can combine this class of 2.3 and 2.4 and move to common

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -240,6 +235,21 @@ object DMLStrategy extends SparkStrategy {
     condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinRangeListUDF]
   }
 
+  object CarbonExtractEquiJoinKeys {
+    def unapply(plan: LogicalPlan): Option[(JoinType, Seq[Expression], Seq[Expression],
+      Option[Expression], LogicalPlan, LogicalPlan)] = {
+      plan match {
+        case join: Join =>
+          ExtractEquiJoinKeys.unapply(join) match {
+              // ignoring hints as carbon is not using them right now

Review comment:
       add a TODO here to support join with Hints

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util
+
+import org.apache.spark.sql.CarbonToSparkAdapter
+
+import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.CarbonParserUtil
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, CreateTableContext, HiveChangeColumnContext}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, QualifiedColType}
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonExplainCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.{DecimalType, StructField}
+
+trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
+
+

Review comment:
       remove extra line

##########
File path: mv/plan/src/main/spark2.3/org/apache/carbondata/mv/plans/modular/ExpressionHelper.scala
##########
@@ -47,4 +55,110 @@ object ExpressionHelper {
     reference.qualifier.head
   }
 
+  def getStatisticsObj(outputList: Seq[NamedExpression],
+      plan: LogicalPlan, stats: Statistics,
+      aliasMap: Option[AttributeMap[Attribute]] = None): Statistics = {
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq.head
+    val attributes: AttributeMap[ColumnStat] = stats.attributeStats
+    var attributeStats = AttributeMap(attributes.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    if (aliasMap.isDefined) {
+      attributeStats = AttributeMap(
+        attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq)
+    }
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  def getOptimizedPlan(s: SubqueryExpression): LogicalPlan = {
+    val Subquery(newPlan) = BirdcageOptimizer.execute(Subquery(s.plan))
+    newPlan
+  }
+
+  def normalizeExpressions(r: NamedExpression, attrs: AttributeSeq): NamedExpression = {
+    QueryPlan.normalizeExprId(r, attrs)
+  }
+
+  def attributeMap(rAliasMap: AttributeMap[Attribute]) : AttributeMap[Expression] = {
+    rAliasMap.asInstanceOf[AttributeMap[Expression]]
+  }
+
+  def seqOfRules : Seq[Rule[LogicalPlan]] = {
+    Seq(
+      // Operator push down
+      PushProjectionThroughUnion,
+      ReorderJoin,
+      EliminateOuterJoin,
+      PushPredicateThroughJoin,
+      PushDownPredicate,
+      ColumnPruning,
+      // Operator combine
+      CollapseRepartition,
+      CollapseProject,
+      CollapseWindow,
+      CombineFilters,
+      CombineLimits,
+      CombineUnions,
+      // Constant folding and strength reduction
+      NullPropagation,
+      FoldablePropagation,
+      ConstantFolding,
+      ReorderAssociativeOperator,
+      // No need to apply LikeSimplification rule while creating MV
+      // as modular plan asCompactSql will be set in schema
+      //        LikeSimplification,
+      BooleanSimplification,
+      SimplifyConditionals,
+      RemoveDispensableExpressions,
+      SimplifyBinaryComparison,
+      EliminateSorts,
+      SimplifyCasts,
+      SimplifyCaseConversionExpressions,
+      RewriteCorrelatedScalarSubquery,
+      EliminateSerialization,
+      RemoveRedundantAliases,
+      RemoveRedundantProject)
+  }
+}
+
+trait getVerboseString extends LeafNode {
+}
+
+trait groupByUnaryNode extends UnaryNode {
+}
+
+trait selectModularPlan extends ModularPlan {
+}
+
+trait unionModularPlan extends ModularPlan {
+}
+
+trait oneRowTableLeafNode extends LeafNode {
+}
+
+object MatchJoin {
+  def unapply(plan : LogicalPlan): Option[(LogicalPlan, LogicalPlan, JoinType, Option[Expression],
+    Option[Any])] = {
+    plan match {
+      case j@Join(left, right, joinType, condition) =>
+        val a = Some(left, right, joinType, condition, None)
+        a

Review comment:
       please refactor and move the common cod to common package fo Expression helper for all version

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
##########
@@ -65,7 +65,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
   protected val EXCLUDE = carbonKeyWord("EXCLUDE")
   protected val EXPLAIN = carbonKeyWord("EXPLAIN")
-  protected val EXTENDED = carbonKeyWord("EXTENDED")
+  protected val MODE = carbonKeyWord("EXTENDED") |
+                       carbonKeyWord("CODEGEN") |
+                       carbonKeyWord("COST") |
+                       carbonKeyWord("FORMATTED")

Review comment:
       please add test case with all MODE if not added

##########
File path: mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
##########
@@ -358,6 +359,12 @@ trait Printers {
           ""
         }
         qualifierPrefix + quoteIdentifier(child.name) + " AS " + quoteIdentifier(a.name)
+      case a@Alias(child: AggregateExpression, _) =>
+        child.sql + " AS " + quoteIdentifier(a.name)

Review comment:
       use String concat using super, s"..."

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
##########
@@ -109,7 +109,9 @@ public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configurati
       return new String[numberOfColumns];
     }
     // If number of columns are less in a row then create new array with same size of header.

Review comment:
       please update the comment according to new conditon

##########
File path: integration/spark/src/main/spark3.1/org/apache/spark/sql/parser/SparkSqlAstBuilderWrapper.scala
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parser
+
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * use this wrapper to adapter multiple spark versions
+ */
+abstract class SparkSqlAstBuilderWrapper(conf: SQLConf) extends SparkSqlAstBuilder {

Review comment:
       can combine 2.3 and 2.4

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
##########
@@ -147,13 +152,15 @@ object BroadCastSIFilterPushJoin {
       inputCopy: Array[InternalRow],
       leftKeys: Seq[Expression],
       rightKeys: Seq[Expression],
-      buildSide: BuildSide,
+      buildSide: CarbonBuildSideType,
       isIndexTable: Boolean = false): Unit = {
 
+    val carbonBuildSide = CarbonBuildSide(buildSide)

Review comment:
       CarbonBuildSide can be put commonto 2.4 and 3.1 and refactor




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-850335259


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3700/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-850337778


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5445/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851478977


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3709/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851489591


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3710/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851536276


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5454/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851601038


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3718/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851622588


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3719/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851628317


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5463/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4141: [CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4141:
URL: https://github.com/apache/carbondata/pull/4141#issuecomment-851853444


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3720/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


1234 ... 7