[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

classic Classic list List threaded Threaded
229 messages Options
1 ... 6789101112
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153094548
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
    -          // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +
    +          val subqueryAlias = CarbonClassReflectionUtils
    +            .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier))
    +          subqueryAlias
           }
         } else {
           updatedSelectPlan
         }
         val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
         val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
    -    val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
    +    val destinationTable = CarbonClassReflectionUtils
    +      .getUnresolvedRelation(table.tableIdentifier, alias)
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153094607
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -143,52 +168,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
    -          // Add the filter condition of update statement  on destination table
    -          SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier))
    +
    +          val subqueryAlias = CarbonClassReflectionUtils
    +            .getSubqueryAlias(sparkSession, alias, updatedSelectPlan, Some(table.tableIdentifier))
    +          subqueryAlias
           }
         } else {
           updatedSelectPlan
         }
         val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
         val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
    -    val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
    +    val destinationTable = CarbonClassReflectionUtils
    +      .getUnresolvedRelation(table.tableIdentifier, alias)
    +
         ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
       }
     
    -  def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
    -   val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
    -     table.tableIdentifier.table)
    +
    +  def processDeleteRecordsQuery(selectStmt: String,
    +      alias: Option[String],
    +      table: UnresolvedRelation): LogicalPlan = {
    +    val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
    +      table.tableIdentifier.table)
         var addedTupleId = false
         val parsePlan = parser.parsePlan(selectStmt)
    +
         val selectPlan = parsePlan transform {
           case relation: UnresolvedRelation
             if table.tableIdentifier == relation.tableIdentifier && !addedTupleId =>
             addedTupleId = true
             val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
               Seq.empty, isDistinct = false), "tupleId")())
    -        val alias = table.alias match {
    -          case Some(alias) => Some(table.alias.toSeq)
    +
    +        val localalias = alias match {
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153094754
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---
    @@ -201,8 +237,10 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
       override def apply(logicalplan: LogicalPlan): LogicalPlan = {
     
         logicalplan transform {
    -      case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
    -      case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)
    +      case UpdateTable(t, cols, sel, alias, where) => processUpdateQuery(t, cols, sel, alias, where)
    +      case DeleteRecords(statement, alias, table) => processDeleteRecordsQuery(statement,
    +        alias,
    +        table)
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153094856
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---
    @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore {
         lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
       }
     
    +  val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = {
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095079
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---
    @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore {
         lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
       }
     
    +  val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta.asInstanceOf[CatalogTable]
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095100
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---
    @@ -115,18 +121,37 @@ class CarbonFileMetastore extends CarbonMetaStore {
         lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
       }
     
    +  val rm = universe.runtimeMirror(getClass.getClassLoader)
    +
    +  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = {
    +    val im = rm.reflect(obj)
    +    val sym = im.symbol.typeSignature.member(TermName(name))
    +    val tableMeta = im.reflectMethod(sym.asMethod).apply()
    +    tableMeta.asInstanceOf[CatalogTable]
    +  }
    +
       override def lookupRelation(tableIdentifier: TableIdentifier)
         (sparkSession: SparkSession): LogicalPlan = {
         val database = tableIdentifier.database.getOrElse(
           sparkSession.catalog.currentDatabase)
         val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
           case SubqueryAlias(_,
    -      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    -      _) =>
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
             carbonDatasourceHadoopRelation.carbonRelation
           case LogicalRelation(
           carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
             carbonDatasourceHadoopRelation.carbonRelation
    +      case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") &&
    +                                                   getField("tableMeta", c)
    +                                                     .asInstanceOf[CatalogTable].provider
    +                                                     .isDefined &&
    +                                                   getField("tableMeta", c)
    +                                                     .asInstanceOf[CatalogTable].provider.get
    +                                                     .equals("org.apache.spark.sql.CarbonSource") =>
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095114
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---
    @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
         val dbName = oldTableIdentifier.getDatabaseName
         val tableName = oldTableIdentifier.getTableName
         val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
    -    sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
    -      s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
    +    val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
    +      .asInstanceOf[HiveExternalCatalog].client
    +    hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
    +
    +    sparkSession.sessionState
    --- End diff --
   
    Removed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095139
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---
    @@ -172,7 +172,13 @@ case class CarbonRelation(
       }
     
       // TODO: Use data from the footers.
    -  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
    +  // TODO For 2.1
    +  //  override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
    +  // Todo for 2.2
    +  //  override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes =
    +  //  this.sizeInBytes)
    +
    +  // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
    --- End diff --
   
    Will resolve it as soon as possible.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095165
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala ---
    @@ -32,76 +32,6 @@ class CarbonSQLConf(sparkSession: SparkSession) {
       /**
        * To initialize dynamic param defaults along with usage docs
        */
    -  def addDefaultCarbonParams(): Unit = {
    -    val ENABLE_UNSAFE_SORT =
    -      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
    -        .doc("To enable/ disable unsafe sort.")
    -        .booleanConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
    -          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
    -    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
    -      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
    -        .doc("To enable/ disable carbon custom block distribution.")
    -        .booleanConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
    -            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
    -    val BAD_RECORDS_LOGGER_ENABLE =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
    -        .doc("To enable/ disable carbon bad record logger.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants
    -          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
    -    val BAD_RECORDS_ACTION =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
    -        .doc("To configure the bad records action.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
    -            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
    -    val IS_EMPTY_DATA_BAD_RECORD =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
    -        .doc("Property to decide weather empty data to be considered bad/ good record.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
    -          .toBoolean)
    -    val SORT_SCOPE =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
    -        .doc("Property to specify sort scope.")
    -        .stringConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
    -          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
    -    val BATCH_SORT_SIZE_INMB =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
    -        .doc("Property to specify batch sort size in MB.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
    -            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
    -    val SINGLE_PASS =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
    -        .doc("Property to enable/disable single_pass.")
    -        .booleanConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
    -    val BAD_RECORD_PATH =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
    -        .doc("Property to configure the bad record location.")
    -        .stringConf
    -        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
    -          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
    -    val GLOBAL_SORT_PARTITIONS =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
    -        .doc("Property to configure the global sort partitions.")
    -        .stringConf
    -        .createWithDefault(carbonProperties
    -          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
    -            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
    -    val DATEFORMAT =
    -      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
    -        .doc("Property to configure data format for date type columns.")
    -        .stringConf
    -        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
    -  }
    --- End diff --
   
    These are not being referred any more


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095183
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---
    @@ -38,7 +38,6 @@ import org.apache.carbondata.core.stats.QueryStatistic
     import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
     import org.apache.carbondata.spark.CarbonAliasDecoderRelation
     
    -
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095637
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala ---
    @@ -34,7 +34,7 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll {
         sql("CREATE TABLE Desc2(Dec2Col1 BigInt, Dec2Col2 String, Dec2Col3 Bigint, Dec2Col4 Decimal) stored by 'carbondata'")
       }
     
    -  test("test describe table") {
    +  ignore("test describe table") {
    --- End diff --
   
    Why ignore this?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095841
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
    @@ -177,7 +213,24 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     
       protected lazy val table: Parser[UnresolvedRelation] = {
         rep1sep(attributeName, ".") ~ opt(ident) ^^ {
    -      case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
    +      case tableIdent ~ alias => UnresolvedRelation(tableIdent)
    +    }
    +  }
    +
    +  protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String],
    +    TableIdentifier)] = {
    +    rep1sep(attributeName, ".") ~ opt(ident) ^^ {
    +      case tableIdent ~ alias =>
    +
    +        val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
    +        val localAlias: Option[String] = alias
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095195
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
    @@ -17,19 +17,23 @@
     
     package org.apache.spark.sql.parser
     
    +import java.lang.reflect.InvocationTargetException
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095958
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---
    @@ -85,71 +85,75 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
           "('DICTIONARY_EXCLUDE'='nodict', 'DEFAULT.VALUE.NoDict'= 'abcd')")
         checkAnswer(sql("select distinct(nodict) from restructure"), Row("abcd"))
       }
    -  test("test add timestamp no dictionary column") {
    -    sql(
    -      "alter table restructure add columns(tmpstmp timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".tmpstmp'= '17-01-2007')")
    -    checkAnswer(sql("select distinct(tmpstmp) from restructure"),
    -      Row(new java.sql.Timestamp(107, 0, 17, 0, 0, 0, 0)))
    -    checkExistence(sql("desc restructure"), true, "tmpstmptimestamp")
    -  }
    -
    -  test("test add timestamp direct dictionary column") {
    -    sql(
    -      "alter table restructure add columns(tmpstmp1 timestamp) TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".tmpstmp1'= '17-01-3007','DICTIONARY_INCLUDE'='tmpstmp1')")
    -    checkAnswer(sql("select distinct(tmpstmp1) from restructure"),
    -      Row(null))
    -    checkExistence(sql("desc restructure"), true, "tmpstmptimestamp")
    -  }
    -
    -  test("test add timestamp column and load as dictionary") {
    -    sql("create table table1(name string) stored by 'carbondata'")
    -    sql("insert into table1 select 'abc'")
    -    sql("alter table table1 add columns(tmpstmp timestamp) TBLPROPERTIES " +
    -        "('DEFAULT.VALUE.tmpstmp'='17-01-3007','DICTIONARY_INCLUDE'= 'tmpstmp')")
    -    sql("insert into table1 select 'name','17-01-2007'")
    -    checkAnswer(sql("select * from table1"),
    -      Seq(Row("abc",null),
    -        Row("name",Timestamp.valueOf("2007-01-17 00:00:00.0"))))
    -  }
    -
    -  test("test add msr column") {
    -    sql(
    -      "alter table restructure add columns(msrField decimal(5,2))TBLPROPERTIES ('DEFAULT.VALUE" +
    -      ".msrfield'= '123.45')")
    -    checkExistence(sql("desc restructure"), true, "msrfielddecimal(5,2)")
    -    val output = sql("select msrField from restructure").collect
    -    checkAnswer(sql("select distinct(msrField) from restructure"),
    -      Row(new BigDecimal("123.45").setScale(2, RoundingMode.HALF_UP)))
    -  }
    -
    -  test("test add all datatype supported dictionary column") {
    -    sql(
    -      "alter table restructure add columns(strfld string, datefld date, tptfld timestamp, " +
    -      "shortFld smallInt, " +
    -      "intFld int, longFld bigint, dblFld double,dcml decimal(5,4))TBLPROPERTIES" +
    -      "('DICTIONARY_INCLUDE'='datefld,shortFld,intFld,longFld,dblFld,dcml', 'DEFAULT.VALUE" +
    -      ".dblFld'= '12345')")
    -    checkAnswer(sql("select distinct(dblFld) from restructure"),
    -      Row(java.lang.Double.parseDouble("12345")))
    -    checkExistence(sql("desc restructure"), true, "strfldstring")
    -    checkExistence(sql("desc restructure"), true, "dateflddate")
    -    checkExistence(sql("desc restructure"), true, "tptfldtimestamp")
    -    checkExistence(sql("desc restructure"), true, "shortfldsmallint")
    -    checkExistence(sql("desc restructure"), true, "intfldint")
    -    checkExistence(sql("desc restructure"), true, "longfldbigint")
    -    checkExistence(sql("desc restructure"), true, "dblflddouble")
    -    checkExistence(sql("desc restructure"), true, "dcmldecimal(5,4)")
    -  }
    -
    -  test(
    -    "test add decimal without scale and precision, default precision and scale (10,0) should be " +
    -    "used")
    -  {
    -    sql("alter table restructure add columns(dcmldefault decimal)")
    -    checkExistence(sql("desc restructure"), true, "dcmldefaultdecimal(10,0)")
    -  }
    +//  test("test add timestamp no dictionary column") {
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095978
 
    --- Diff: pom.xml ---
    @@ -509,6 +501,8 @@
                     <sourceDirectory>${basedir}/integration/hive/src/main/java</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/scala</sourceDirectory>
                     <sourceDirectory>${basedir}/integration/presto/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    +                <sourceDirectory>${basedir}/streaming/src/main/java</sourceDirectory>
    --- End diff --
   
    Removed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153096000
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -207,3 +213,26 @@ class CarbonOptimizer(
         super.execute(transFormedPlan)
       }
     }
    +
    +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
    +  SparkSqlAstBuilder(conf) {
    +
    +  val helper = new CarbonHelperqlAstBuilder(conf, parser)
    +
    +  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
    +    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
    +
    +    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
    +        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
    +      helper
    +        .createCarbontable(ctx.createTableHeader,
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153096010
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -150,9 +153,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
             DataSourceAnalysis(conf) ::
             (if (conf.runSQLonFile) {
               new ResolveDataSource(sparkSession) :: Nil
    -        } else {
    -          Nil
    -        })
    +        } else {  Nil }
    +          )
    --- End diff --
   
    Ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153095995
 
    --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, CreateTableContext}
    +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
    +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
    +import org.apache.spark.sql.hive.client.HiveClient
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
    +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
    +
    +import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog(
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) {
    +
    +  lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +
    +  private def refreshRelationFromCache(identifier: TableIdentifier,
    +      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
    +    var isRefreshed = false
    +    val storePath = CarbonEnv.getInstance(sparkSession).storePath
    +    carbonEnv.carbonMetastore.
    +      checkSchemasModifiedTimeAndReloadTables(storePath)
    +
    +    val tableMeta = carbonEnv.carbonMetastore
    +      .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    +        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    +    if (tableMeta.isEmpty || (tableMeta.isDefined &&
    +                              tableMeta.get.carbonTable.getTableLastUpdatedTime !=
    +                              carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
    +      refreshTable(identifier)
    +      DataMapStoreManager.getInstance().
    +        clearDataMap(AbsoluteTableIdentifier.from(storePath,
    +          identifier.database.getOrElse("default"), identifier.table))
    +      isRefreshed = true
    +      logInfo(s"Schema changes have been detected for table: $identifier")
    +    }
    +    isRefreshed
    +  }
    +
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    var toRefreshRelation = false
    +    rtnRelation match {
    +      case SubqueryAlias(_,
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
    +        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
    +      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
    +        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
    +      case _ =>
    +    }
    +
    +    if (toRefreshRelation) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +}
    +
    +/**
    + * Session state implementation to override sql parser and adding strategies
    + *
    + * @param sparkSession
    + */
    +class CarbonSessionStateBuilder(sparkSession: SparkSession,
    +    parentState: Option[SessionState] = None)
    +  extends HiveSessionStateBuilder(sparkSession, parentState) {
    +
    +  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
    +
    +  experimentalMethods.extraStrategies =
    +    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
    +
    +  /**
    +   * Internal catalog for managing table and database states.
    +   */
    +  /**
    +   * Create a [[CarbonSessionCatalogBuild]].
    +   */
    +  override protected lazy val catalog: CarbonSessionCatalog = {
    +    val catalog = new CarbonSessionCatalog(
    +      externalCatalog,
    +      session.sharedState.globalTempViewManager,
    +      functionRegistry,
    +      sparkSession,
    +      conf,
    +      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
    +      sqlParser,
    +      resourceLoader)
    +    parentState.foreach(_.catalog.copyStateTo(catalog))
    +    catalog
    +  }
    +
    +  private def externalCatalog: HiveExternalCatalog =
    +    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
    +
    +  /**
    +   * Create a Hive aware resource loader.
    +   */
    +  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
    +    val client: HiveClient = externalCatalog.client.newSession()
    +    new HiveSessionResourceLoader(session, client)
    +  }
    +
    +  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
    +
    +  override protected def analyzer: Analyzer = {
    +    new Analyzer(catalog, conf) {
    +
    +      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new ResolveHiveSerdeTable(session) +:
    +        new FindDataSourceTable(session) +:
    +        new ResolveSQLOnFile(session) +:
    +        new CarbonIUDAnalysisRule(sparkSession) +:
    +        CarbonPreInsertionCasts +: customResolutionRules
    +
    +      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
    +      PreWriteCheck :: HiveOnlyCheck :: Nil
    +
    +      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new DetermineTableStats(session) +:
    +        RelationConversions(conf, catalog) +:
    +        PreprocessTableCreation(session) +:
    +        PreprocessTableInsertion(conf) +:
    +        DataSourceAnalysis(conf) +:
    +        HiveAnalysis +:
    +        customPostHocResolutionRules
    +    }
    +  }
    +
    +  override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
    +
    +}
    +
    +
    +class CarbonOptimizer(
    +    catalog: SessionCatalog,
    +    conf: SQLConf,
    +    experimentalMethods: ExperimentalMethods)
    +  extends SparkOptimizer(catalog, conf, experimentalMethods) {
    +
    +  override def execute(plan: LogicalPlan): LogicalPlan = {
    +    // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
    +    // optimize whole plan at once.
    +    val transFormedPlan = plan.transform {
    +      case filter: Filter =>
    +        filter.transformExpressions {
    +          case s: ScalarSubquery =>
    +            val tPlan = s.plan.transform {
    +              case lr: LogicalRelation
    +                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
    +                lr
    +            }
    +            ScalarSubquery(tPlan, s.children, s.exprId)
    +        }
    +    }
    +    super.execute(transFormedPlan)
    +  }
    +}
    +
    +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
    +  SparkSqlAstBuilder(conf) {
    +
    +  val helper = new CarbonHelperqlAstBuilder(conf, parser)
    +
    +  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
    +    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
    +
    +    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
    +        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
    +      helper
    +        .createCarbontable(ctx.createTableHeader,
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153096022
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -133,7 +134,9 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
       override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
     
       experimentalMethods.extraStrategies =
    -    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +    Seq(new CarbonLateDecodeStrategy,
    +      new DDLStrategy(sparkSession)
    +    )
    --- End diff --
   
    Done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153096029
 
    --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
    @@ -84,8 +86,8 @@ class CarbonSessionCatalog(
         var toRefreshRelation = false
         rtnRelation match {
           case SubqueryAlias(_,
    -          LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    -          _) =>
    +      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
    +      _) =>
    --- End diff --
   
    Done


---
1 ... 6789101112