[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

classic Classic list List threaded Threaded
32 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
GitHub user akashrn5 opened a pull request:

    https://github.com/apache/carbondata/pull/3027

    [CARBONDATA-3202]update the schema to session catalog after add column, drop column and column rename

    ### Why this PR?
   
    **Problem:**For alter table rename, once we change the table name in carbon, we fire alter table rename DDL using hive client. But for add, drop and column rename Spark does not support there features, but hive supports. so after rename, or add or drop column, the new updated schema is not updated in catalog.
   
    **Solution:**We can directly call the spark API **alterTableDataSchema** by passing the updated schema, which in turn updates the shema in sessioncatalog
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [x] Any interfaces changed?
     YES
     - [x] Any backward compatibility impacted?
     NA
     - [x] Document update required?
    NA
     - [x] Testing done
    tested in three node cluster for various spark versions
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
    NA


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/akashrn5/incubator-carbondata addcolumn

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/3027.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3027
   
----
commit ee78136e55b309a4521914fdee57adf59cda8531
Author: akashrn5 <akashnilugal@...>
Date:   2018-12-27T06:01:44Z

    update the schema to session catalog after add column, drop column and column rename

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2223/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2256/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2049/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Failed  with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10301/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2058/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244271732
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---
    @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           schemaEvolutionEntry.setAdded(newCols.toList.asJava)
           val thriftTable = schemaConverter
             .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
    +      // carbon columns based on schema order
    +      val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
    +        .collect { case carbonColumn => carbonColumn.getColumnSchema }
    +        .filter(!_.isInvisible)
    +      // sort the new columns based on schema order
    +      val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal)
           val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
               carbonTable,
               schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
               thriftTable,
    -          Some(newCols))(sparkSession)
    +          Some(carbonColumns ++ sortedColsBasedActualSchemaOrder))(sparkSession)
    --- End diff --
   
    `AlterTableUtil.updateSchemaInfo` is not making use of columns passed so remove the method argument and use columns for changing the hive schema


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244271907
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala ---
    @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
           carbonTable: CarbonTable,
           tableInfo: TableInfo,
           addColumnSchema: ColumnSchema,
    -      schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
    +      schemaEvolutionEntry: SchemaEvolutionEntry,
    +      oldCarbonColumn: CarbonColumn): Unit = {
         val schemaConverter = new ThriftWrapperSchemaConverterImpl
    -    val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
    +    // get the carbon column in schema order
    +    val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
    +      .filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema}
    --- End diff --
   
    Move filter operation to collect


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244270598
 
    --- Diff: integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala ---
    @@ -93,4 +98,34 @@ object CarbonSessionUtil {
         )
       }
     
    +  /**
    +   * This method alter the table for datatype change or column rename operation, and update the
    +   * external catalog directly
    +   *
    +   * @param tableIdentifier tableIdentifier for table
    +   * @param cols            all the column of table, which are updated with datatype change of
    +   *                        new column name
    +   * @param schemaParts     schemaParts
    +   * @param sparkSession    sparkSession
    +   */
    +  def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier,
    +      cols: Option[Seq[ColumnSchema]],
    +      schemaParts: String,
    +      sparkSession: SparkSession): Unit = {
    +    val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
    +    val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer()
    +    cols.get.foreach(column =>
    +      if (!column.isInvisible) {
    +        colArray += StructField(column.getColumnName,
    +          SparkTypeConverter
    +            .convertCarbonToSparkDataType(column,
    +              carbonTable))
    +      }
    +    )
    +    sparkSession.sessionState.catalog.externalCatalog
    +      .alterTableDataSchema(tableIdentifier.database.get,
    --- End diff --
   
    add a comment for the usage of API `alterTableDataSchema` to explain its purpose


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244272092
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala ---
    @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
           carbonTable: CarbonTable,
           tableInfo: TableInfo,
           addColumnSchema: ColumnSchema,
    -      schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
    +      schemaEvolutionEntry: SchemaEvolutionEntry,
    +      oldCarbonColumn: CarbonColumn): Unit = {
         val schemaConverter = new ThriftWrapperSchemaConverterImpl
    -    val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
    +    // get the carbon column in schema order
    +    val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
    +      .filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema}
    +    // get the schema ordinal of the column for which the datatype changed or column is renamed
    +    val schemaOrdinal = carbonColumns.collect {
    --- End diff --
   
    Instead of collect try and use foreach


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244271865
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---
    @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           schemaEvolutionEntry.setAdded(newCols.toList.asJava)
           val thriftTable = schemaConverter
             .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
    +      // carbon columns based on schema order
    +      val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
    +        .collect { case carbonColumn => carbonColumn.getColumnSchema }
    +        .filter(!_.isInvisible)
    --- End diff --
   
    Move filter operation in collect operation by adding if clause in the case statement


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244270219
 
    --- Diff: integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala ---
    @@ -105,47 +106,37 @@ class CarbonHiveSessionCatalog(
           .asInstanceOf[HiveExternalCatalog].client
       }
     
    -  def alterTableRename(oldTableIdentifier: TableIdentifier,
    -      newTableIdentifier: TableIdentifier,
    -      newTablePath: String): Unit = {
    -    getClient().runSqlHive(
    -      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
    -      s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
    -    getClient().runSqlHive(
    -      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
    -      s"SET SERDEPROPERTIES" +
    -      s"('tableName'='${ newTableIdentifier.table }', " +
    -      s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
    -  }
    -
    -  override def alterTable(tableIdentifier: TableIdentifier,
    -      schemaParts: String,
    -      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
    -  : Unit = {
    -    getClient()
    -      .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
    -                  s"SET TBLPROPERTIES(${ schemaParts })")
    -  }
    -
       override def alterAddColumns(tableIdentifier: TableIdentifier,
           schemaParts: String,
    -      cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
    -  : Unit = {
    +      cols: Option[Seq[ColumnSchema]]): Unit = {
         alterTable(tableIdentifier, schemaParts, cols)
    +    CarbonSessionUtil
    +      .alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier,
    +        cols,
    +        schemaParts,
    +        sparkSession)
       }
     
       override def alterDropColumns(tableIdentifier: TableIdentifier,
    --- End diff --
   
    Unify `alterDropColumns` and `alterAddColumns` into one method...keep interface methods same but move the common code to 1 method and call it from the interface methods


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2060/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10312/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2264/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2064/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user akashrn5 commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    @manishgupta88 handled review coments, please review


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2066/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #3027: [CARBONDATA-3202]update the schema to session catalo...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/3027
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2067/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3027#discussion_r244306298
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala ---
    @@ -262,13 +263,28 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
           carbonTable: CarbonTable,
           tableInfo: TableInfo,
           addColumnSchema: ColumnSchema,
    -      schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
    +      schemaEvolutionEntry: SchemaEvolutionEntry,
    +      oldCarbonColumn: CarbonColumn): Unit = {
         val schemaConverter = new ThriftWrapperSchemaConverterImpl
    -    val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
    -    val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
    -      carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
    +    // get the carbon column in schema order
    +    val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
    +      .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
    +    // get the schema ordinal of the column for which the datatype changed or column is renamed
    +    var schemaOrdinal: Int = 0
    +    carbonColumns.foreach { carbonColumn =>
    +      if (carbonColumn.getColumnName.equalsIgnoreCase(oldCarbonColumn.getColName)) {
    +        schemaOrdinal = carbonColumns.indexOf(carbonColumn)
    --- End diff --
   
    Use filter function to achieve the required output


---
12