[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

classic Classic list List threaded Threaded
229 messages Options
123456 ... 12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152726082
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---
    @@ -26,13 +26,20 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo
     import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
     import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
     import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
    +import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
    +import org.apache.spark.sql.catalyst.catalog.CatalogTypes
     
     import org.apache.carbondata.core.util.CarbonUtil
     import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
     
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728216
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
         def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
           val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
             Seq.empty, isDistinct = false), "tupleId")())
    +
    +      val localalias = alias match {
    --- End diff --
   
    use localAlias instead of localalias.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728246
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
         def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
           val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
             Seq.empty, isDistinct = false), "tupleId")())
    +
    +      val localalias = alias match {
    +        case Some(a) => Some(alias.toSeq)
    +        case _ => None
    +      }
           val projList = Seq(
    -        UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
    +        UnresolvedAlias(UnresolvedStar(localalias)), tupleId)
           // include tuple id and rest of the required columns in subqury
    -      SubqueryAlias(table.alias.getOrElse(""),
    -        Project(projList, relation), Option(table.tableIdentifier))
    +//      SubqueryAlias(alias.getOrElse(""),
    +//        Project(projList, relation), Option(table.tableIdentifier))
    +//
    +      if (sparkSession.version.contains("2.1")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728263
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
         def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = {
           val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
             Seq.empty, isDistinct = false), "tupleId")())
    +
    +      val localalias = alias match {
    +        case Some(a) => Some(alias.toSeq)
    +        case _ => None
    +      }
           val projList = Seq(
    -        UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId)
    +        UnresolvedAlias(UnresolvedStar(localalias)), tupleId)
           // include tuple id and rest of the required columns in subqury
    -      SubqueryAlias(table.alias.getOrElse(""),
    -        Project(projList, relation), Option(table.tableIdentifier))
    +//      SubqueryAlias(alias.getOrElse(""),
    +//        Project(projList, relation), Option(table.tableIdentifier))
    +//
    +      if (sparkSession.version.contains("2.1")) {
    +        //        SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +        //          Project(projList, relation), Option(table.tableIdentifier))
    +        val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +        val ctor = clazz.getConstructors.head
    +        ctor.setAccessible(true)
    +        val subqueryAlias = ctor
    +          .newInstance(alias.getOrElse(""),
    +            Project(projList, relation), Option(table.tableIdentifier)).asInstanceOf[SubqueryAlias]
    +        subqueryAlias
    +      } else if (sparkSession.version.contains("2.2")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728384
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
    --- End diff --
   
    the indent of above 3 lines is wrong


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728413
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
               // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +          // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +          if (sparkSession.version.contains("2.1")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728420
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
               // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +          // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +          if (sparkSession.version.contains("2.1")) {
    +            // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier))
    +            val clazz = Utils
    +              .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor
    +              .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else if (sparkSession.version.contains("2.2")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728433
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
               // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +          // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +          if (sparkSession.version.contains("2.1")) {
    +            // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier))
    +            val clazz = Utils
    +              .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor
    +              .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else if (sparkSession.version.contains("2.2")) {
    +            // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +            //  Project(projList, relation))
    +            val clazz = Utils
    +              .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan)
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else {
    +            throw new UnsupportedOperationException("Unsupported Spark version")
    +          }
           }
         } else {
           updatedSelectPlan
         }
         val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
         val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
    -    val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
    +    // TODO use reflection
    +    // val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse("")))
    +    val destinationTable =
    +      if (sparkSession.version.contains("2.1")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728438
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
               // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +          // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +          if (sparkSession.version.contains("2.1")) {
    +            // SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier))
    +            val clazz = Utils
    +              .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor
    +              .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier))
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else if (sparkSession.version.contains("2.2")) {
    +            // SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +            //  Project(projList, relation))
    +            val clazz = Utils
    +              .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan)
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else {
    +            throw new UnsupportedOperationException("Unsupported Spark version")
    +          }
           }
         } else {
           updatedSelectPlan
         }
         val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
         val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
    -    val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
    +    // TODO use reflection
    +    // val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse("")))
    +    val destinationTable =
    +      if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(table.tableIdentifier,
    +          Some(alias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    } else if (sparkSession.version.contains("2.2")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728527
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---
    @@ -115,18 +121,50 @@ class CarbonFileMetastore extends CarbonMetaStore {
         lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
       }
     
    +  val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta.asInstanceOf[CatalogTable]
    +  }
    +
       override def lookupRelation(tableIdentifier: TableIdentifier)
         (sparkSession: SparkSession): LogicalPlan = {
         val database = tableIdentifier.database.getOrElse(
           sparkSession.catalog.currentDatabase)
         val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
           case SubqueryAlias(_,
    -      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    -      _) =>
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
             carbonDatasourceHadoopRelation.carbonRelation
           case LogicalRelation(
           carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
             carbonDatasourceHadoopRelation.carbonRelation
    +
    +//      case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") &&
    +//                                                   getField("tableMeta", c)
    +//                                                     .asInstanceOf[CatalogTable].provider
    +//                                                     .isDefined &&
    +//                                                   getField("tableMeta", c)
    +//                                                     .asInstanceOf[String]
    +//                                                  .equals("org.apache.spark.sql.CarbonSource") =>
    +//        new CarbonSource()
    +//          .createRelation(sparkSession.sqlContext,
    +//            c.tableMeta.storage.properties)
    +//          .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
    +
    +      case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") &&
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728579
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---
    @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
         val dbName = oldTableIdentifier.getDatabaseName
         val tableName = oldTableIdentifier.getTableName
         val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
    -    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
    -      s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
    +    val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
    +      .asInstanceOf[HiveExternalCatalog].client
    +    hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
    +
    +    sparkSession.sessionState
    --- End diff --
   
    unused code, remove it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728764
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizer.scala ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import org.apache.spark.sql.ExperimentalMethods
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.hive.CarbonOptimizerCompileCode.AbstractCarbonOptimizerFactory
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.ScalaCompilerUtil
    +
    +
    +private[sql] class CarbonOptimizerCodeGenerateFactory(version: String) {
    +
    +  val carbonoptimizerFactory = if (version.equals("2.1")) {
    --- End diff --
   
    use startsWith instead of equals.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728865
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConfFactory.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import org.apache.spark.internal.config.ConfigBuilder
    +import org.apache.spark.sql.hive.CarbonSqlConfCompileCode.AbstractCarbonSqlConfFactory
    +import org.apache.spark.util.ScalaCompilerUtil
    +
    +
    +private[sql] class CarbonSqlConfCodeGenerateFactory(version: String) {
    +
    +  val carbonSqlConfFactory = if (version.equals("2.1")) {
    --- End diff --
   
    use startsWith instead of equals


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728991
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala ---
    @@ -32,76 +32,6 @@ class CarbonSQLConf(sparkSession: SparkSession) {
       /**
        * To initialize dynamic param defaults along with usage docs
        */
    -  def addDefaultCarbonParams(): Unit = {
    -    val ENABLE_UNSAFE_SORT =
    -      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
    -        .doc("To enable/ disable unsafe sort.")
    -        .booleanConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
    -          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
    -    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
    -      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
    -        .doc("To enable/ disable carbon custom block distribution.")
    -        .booleanConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
    -            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
    -    val BAD_RECORDS_LOGGER_ENABLE =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
    -        .doc("To enable/ disable carbon bad record logger.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants
    -          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
    -    val BAD_RECORDS_ACTION =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
    -        .doc("To configure the bad records action.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
    -            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
    -    val IS_EMPTY_DATA_BAD_RECORD =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
    -        .doc("Property to decide weather empty data to be considered bad/ good record.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
    -          .toBoolean)
    -    val SORT_SCOPE =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
    -        .doc("Property to specify sort scope.")
    -        .stringConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
    -          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
    -    val BATCH_SORT_SIZE_INMB =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
    -        .doc("Property to specify batch sort size in MB.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
    -            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
    -    val SINGLE_PASS =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
    -        .doc("Property to enable/disable single_pass.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
    -    val BAD_RECORD_PATH =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
    -        .doc("Property to configure the bad record location.")
    -        .stringConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
    -          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
    -    val GLOBAL_SORT_PARTITIONS =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
    -        .doc("Property to configure the global sort partitions.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
    -            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
    -    val DATEFORMAT =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
    -        .doc("Property to configure data format for date type columns.")
    -        .stringConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
    -  }
    --- End diff --
   
    why does it need to remove above lines?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152729085
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
    @@ -17,19 +17,23 @@
     
     package org.apache.spark.sql.parser
     
    +import java.lang.reflect.InvocationTargetException
    +
     import scala.collection.mutable
     import scala.language.implicitConversions
     
     import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable}
    -import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
    +import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
     import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
    -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.execution.command._
     import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
     import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
     import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
     import org.apache.spark.sql.types.StructField
    +import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
    +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedRelation, UnresolvedStar}
    --- End diff --
   
    the order of imports is wrong.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152729132
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---
    @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil
      */
     class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
     
    -  val astBuilder = new CarbonSqlAstBuilder(conf)
    +  val parser = new CarbonSpark2SqlParser
    +  val astBuilder = getAstBuilder()
    +
    +  def getAstBuilder(): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152729141
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---
    @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil
      */
     class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
     
    -  val astBuilder = new CarbonSqlAstBuilder(conf)
    +  val parser = new CarbonSpark2SqlParser
    +  val astBuilder = getAstBuilder()
    +
    +  def getAstBuilder(): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    --- End diff --
   
    use startsWith instead of contains


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152730468
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---
    @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil
      */
     class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
     
    -  val astBuilder = new CarbonSqlAstBuilder(conf)
    +  val parser = new CarbonSpark2SqlParser
    +  val astBuilder = getAstBuilder()
    +
    +  def getAstBuilder(): AstBuilder = {
    +    if (sparkSession.version.contains("2.1")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder]
    +      astBuilder
    +    } else if (sparkSession.version.contains("2.2")) {
    +      val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder]
    +      astBuilder
    --- End diff --
   
    what's the difference between the code for 2.1 and 2.2


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152730610
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -16,6 +16,7 @@
      */
     package org.apache.spark.sql.hive
     
    +
    --- End diff --
   
    remove empty line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152730664
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -84,8 +87,8 @@ class CarbonSessionCatalog(
         var toRefreshRelation = false
         rtnRelation match {
           case SubqueryAlias(_,
    -          LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    -          _) =>
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    --- End diff --
   
    wrong indent


---
123456 ... 12