[GitHub] [carbondata] akkio-97 commented on a change in pull request #4129: [CARBONDATA-4179] Support renaming of complex columns (array/struct)

Posted by GitBox on
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-akkio-97-opened-a-new-pull-request-4129-WIP-alter-rename-complex-types-tp108015p108717.html


akkio-97 commented on a change in pull request #4129:
URL: https://github.com/apache/carbondata/pull/4129#discussion_r645487350



##########
File path: docs/ddl-of-carbondata.md
##########
@@ -805,7 +805,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      2. If a column to be dropped has any Secondary index table created on them, drop column operation fails and the user will
      be asked to drop the corresponding SI table first before going for actual drop.
 
-   - #### CHANGE COLUMN NAME/TYPE/COMMENT
+   - #### change-column-nametype

Review comment:
       link was not directing to its description properly due to wrong name. This name was not chosen by me but was already there. Changing it to change-column-name-type-comment

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -225,12 +264,46 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
             newColumnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
             columnSchema.setColumnProperties(newColumnProperties)
           }
-          addColumnSchema = columnSchema
+          addedTableColumnSchema = columnSchema
+        } else if (isComplexChild(columnSchema)) {
+          if (alteredColumnNamesMap.contains(columnSchemaName)) {
+            // matches exactly
+            val newComplexChildName = alteredColumnNamesMap(columnSchemaName)
+            columnSchema.setColumn_name(newComplexChildName)
+            isSchemaEntryRequired = true
+          } else {
+            val alteredParent = checkIfParentIsAltered(columnSchemaName)
+            /*
+             * Lets say, if complex schema is: str struct<a: int>
+             * and if parent column is changed from str -> str2
+             * then its child name should also be changed from str.a -> str2.a
+             */
+            if (alteredParent != null) {
+              val newParent = alteredColumnNamesMap(alteredParent)
+              val newComplexChildName = newParent + columnSchemaName
+                .split(alteredParent)(1)
+              columnSchema.setColumn_name(newComplexChildName)
+              isSchemaEntryRequired = true
+            }
+          }
+        }
+        // validate duplicate child columns
+        if (uniqueColumnSet.contains(columnSchema.getColumn_name)) {
+          throw new UnsupportedOperationException("Duplicate columns are present")
+        }
+
+        // make a new schema evolution entry after column rename or datatype change
+        if (isSchemaEntryRequired) {
+          addedColumnsList ++= List(columnSchema)
+          deletedColumnsList ++= List(deletedColumnSchema)
           timeStamp = System.currentTimeMillis()
-          // make a new schema evolution entry after column rename or datatype change
-          schemaEvolutionEntry = AlterTableUtil
-            .addNewSchemaEvolutionEntry(timeStamp, addColumnSchema, deletedColumnSchema)
+          schemaEvolutionEntry = AlterTableUtil.addNewSchemaEvolutionEntry(schemaEvolutionEntry,

Review comment:
       Done.
   
   yes I have changed the API to consider altered column names of both children and parent.
   Sending only added and deleted list now in parameters. And have retained the null check

##########
File path: integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
##########
@@ -357,18 +358,30 @@ object AlterTableUtil {
   /**
    * This method create a new SchemaEvolutionEntry and adds to SchemaEvolutionEntry List
    *
-   * @param addColumnSchema          added new column schema
-   * @param deletedColumnSchema      old column schema which is deleted
+   * @param addColumnSchema     added new column schema
+   * @param deletedColumnSchema old column schema which is deleted
+   * @param addedColumnsList    list of added column schemas
+   * @param deletedColumnsList  list of deleted column schemas
    * @return
    */
   def addNewSchemaEvolutionEntry(
-      timeStamp: Long,
+      schemaEvolutionEntry: SchemaEvolutionEntry,
       addColumnSchema: org.apache.carbondata.format.ColumnSchema,
-      deletedColumnSchema: org.apache.carbondata.format.ColumnSchema): SchemaEvolutionEntry = {
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-    schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-    schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-    schemaEvolutionEntry
+      deletedColumnSchema: org.apache.carbondata.format.ColumnSchema,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
##########
@@ -1063,4 +1076,106 @@ object AlterTableUtil {
       }
     }
   }
+
+  /**
+   * This method checks the structure of the old and new complex columns, and-
+   * 1. throws exception if the number of complex-levels in both columns does not match
+   * 2. throws exception if the number of children of both columns does not match
+   * 3. creates alteredColumnNamesMap: new_column_name -> datatype. Here new_column_name are those
+   *    names of the columns that are altered.
+   * These maps will later be used while altering the table schema
+   */
+  def validateComplexStructure(oldDimensionList: List[CarbonDimension],
+      newDimensionList: List[DataTypeInfo],
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String]): Unit = {
+    if (oldDimensionList == null && newDimensionList == null) {
+      throw new UnsupportedOperationException("Both old and new dimensions are null")
+    } else if (oldDimensionList == null || newDimensionList == null) {
+      throw new UnsupportedOperationException("Either the old or the new dimension is null")
+    } else if (oldDimensionList.size != newDimensionList.size) {
+      throw new UnsupportedOperationException(
+        "Number of children of old and new complex columns are not the same")
+    } else {
+      for ((newDimensionInfo, i) <- newDimensionList.zipWithIndex) {
+        val oldDimensionInfo = oldDimensionList(i)
+        val old_column_name = oldDimensionInfo.getColName.split('.').last

Review comment:
       Done. There is no constant for character dot. So I created one

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -180,33 +200,50 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       }
       if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
         // validate the columns to be renamed
-        validColumnsForRenaming(carbonColumns, oldCarbonColumn.head, carbonTable)
+        validColumnsForRenaming(carbonColumns, carbonTable)
       }
       if (isDataTypeChange) {
         // validate the columns to change datatype
-        validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
+        AlterTableUtil.validateColumnDataType(alterTableColRenameAndDataTypeChangeModel
+          .dataTypeInfo,
           oldCarbonColumn.head)
       }
       // read the latest schema file
       val tableInfo: TableInfo =
         metaStore.getThriftTableInfo(carbonTable)
       // maintain the added column for schema evolution history
-      var addColumnSchema: ColumnSchema = null
+      var addedTableColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null
       var schemaEvolutionEntry: SchemaEvolutionEntry = null
+      var addedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
+      var deletedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+      // to validate duplicate children columns
+      var uniqueColumnSet: mutable.Set[String] = mutable.Set.empty
 
+      /*
+      * columnSchemaList is a flat structure containing all column schemas including both parent
+      * and child.
+      * It is iterated and rename/change-datatype update are made in this list itself.
+      * Entry is made to the schemaEvolutionEntry for each of the update.
+      */
       columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(oldColumnName)) {
-          deletedColumnSchema = columnSchema.deepCopy()
-          if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
-            // if only column rename, just get the column schema and rename, make a
-            // schemaEvolutionEntry
+        val columnSchemaName = columnSchema.column_name

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -180,33 +200,50 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       }
       if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
         // validate the columns to be renamed
-        validColumnsForRenaming(carbonColumns, oldCarbonColumn.head, carbonTable)
+        validColumnsForRenaming(carbonColumns, carbonTable)
       }
       if (isDataTypeChange) {
         // validate the columns to change datatype
-        validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
+        AlterTableUtil.validateColumnDataType(alterTableColRenameAndDataTypeChangeModel
+          .dataTypeInfo,
           oldCarbonColumn.head)
       }
       // read the latest schema file
       val tableInfo: TableInfo =
         metaStore.getThriftTableInfo(carbonTable)
       // maintain the added column for schema evolution history
-      var addColumnSchema: ColumnSchema = null
+      var addedTableColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null
       var schemaEvolutionEntry: SchemaEvolutionEntry = null
+      var addedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
+      var deletedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+      // to validate duplicate children columns
+      var uniqueColumnSet: mutable.Set[String] = mutable.Set.empty
 
+      /*
+      * columnSchemaList is a flat structure containing all column schemas including both parent
+      * and child.
+      * It is iterated and rename/change-datatype update are made in this list itself.
+      * Entry is made to the schemaEvolutionEntry for each of the update.
+      */
       columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(oldColumnName)) {
-          deletedColumnSchema = columnSchema.deepCopy()
-          if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
-            // if only column rename, just get the column schema and rename, make a
-            // schemaEvolutionEntry
+        val columnSchemaName = columnSchema.column_name
+        val isTableColumn = columnSchemaName.equalsIgnoreCase(oldColumnName)

Review comment:
       changed it to isTableColumnAltered




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]