[GitHub] [carbondata] akashrn5 commented on a change in pull request #4129: [CARBONDATA-4179] Support renaming of complex columns (array/struct)

Posted by GitBox on
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-akkio-97-opened-a-new-pull-request-4129-WIP-alter-rename-complex-types-tp108015p108728.html


akashrn5 commented on a change in pull request #4129:
URL: https://github.com/apache/carbondata/pull/4129#discussion_r645573633



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1898,6 +1898,12 @@ private CarbonCommonConstants() {
    */
   public static final String POINT = ".";
 
+  /**
+   * POINT as a character
+   */
+  public static final char CHAR_POINT = '.';

Review comment:
       please remove this, no need to add as char, use the existing one `POINT`

##########
File path: docs/ddl-of-carbondata.md
##########
@@ -805,7 +805,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      2. If a column to be dropped has any Secondary index table created on them, drop column operation fails and the user will
      be asked to drop the corresponding SI table first before going for actual drop.
 
-   - #### CHANGE COLUMN NAME/TYPE/COMMENT
+   - #### change-column-name-type-comment

Review comment:
       please make it capital letters like before and please check if link is working and also do not give the - in between words, please refer `SET/UNSET` section and follow the same here

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -34,31 +34,26 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.datatype.DecimalType
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newColumnName: String)
   extends MetadataCommand {
 
-  protected def validColumnsForRenaming(carbonColumns: mutable.Buffer[CarbonColumn],
-      oldCarbonColumn: CarbonColumn,
+  protected def validColumnsForRenaming(columnSchemaList: mutable.Buffer[ColumnSchema],
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String],
       carbonTable: CarbonTable): Unit = {
     // check whether new column name is already an existing column name
-    if (carbonColumns.exists(_.getColName.equalsIgnoreCase(newColumnName))) {
-      throw new MalformedCarbonCommandException(s"Column Rename Operation failed. New " +
-                                                s"column name $newColumnName already exists" +
-                                                s" in table ${ carbonTable.getTableName }")
-    }
-
-    // if the column rename is for complex column, block the operation
-    if (oldCarbonColumn.isComplex) {
-      throw new MalformedCarbonCommandException(s"Column Rename Operation failed. Rename " +
-                                                s"column is unsupported for complex datatype " +
-                                                s"column ${ oldCarbonColumn.getColName }")
+    for ((oldColumnName, newColumnName) <- alteredColumnNamesMap) {
+      if (columnSchemaList.exists(_.getColumn_name.equalsIgnoreCase(newColumnName))) {
+        throw new MalformedCarbonCommandException(s"Column Rename Operation failed. New " +
+                                                  s"column name $newColumnName already exists" +
+                                                  s" in table ${ carbonTable.getTableName }")
+      }

Review comment:
       please do not use standard for loop, use in a functional way to check and throw exception.
   

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -69,12 +64,11 @@ abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newCol
           if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
             throw new MalformedCarbonCommandException(
               s"Column Rename Operation failed. Renaming " +
-                s"the bucket column $oldColumnName is not " +
-                s"allowed")
+              s"the bucket column $oldColumnName is not " +
+              s"allowed")
           }
       }
     }
-

Review comment:
       remove this

##########
File path: docs/ddl-of-carbondata.md
##########
@@ -866,6 +880,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      **NOTE:**
      * Merge index is supported on streaming table from carbondata 2.0.1 version.
      But streaming segments (ROW_V1) cannot create merge index.
+     * Rename column name is only supported for complex columns including array and struct.

Review comment:
       ```suggestion
        * Rename column name is not supported for MAP type.
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -69,12 +64,11 @@ abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newCol
           if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
             throw new MalformedCarbonCommandException(
               s"Column Rename Operation failed. Renaming " +
-                s"the bucket column $oldColumnName is not " +
-                s"allowed")
+              s"the bucket column $oldColumnName is not " +

Review comment:
       revert unnecessary change

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -143,27 +143,49 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
       val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
       // set isDataTypeChange flag
-      if (oldCarbonColumn.head.getDataType.getName
-        .equalsIgnoreCase(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) {
+      val oldDatatype = oldCarbonColumn.head.getDataType
+      val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
+      if (isColumnRename && (DataTypes.isMapType(oldDatatype) ||
+                             newDatatype.equalsIgnoreCase(CarbonCommonConstants.MAP))) {
+        throw new UnsupportedOperationException(
+          "Alter rename is unsupported for Map datatype column")
+      }
+      if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
         val newColumnPrecision =
           alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
         val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
         // if the source datatype is decimal and there is change in precision and scale, then
         // along with rename, datatype change is also required for the command, so set the
         // isDataTypeChange flag to true in this case
-        if (oldCarbonColumn.head.getDataType.getName.equalsIgnoreCase("decimal") &&
-            (oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getPrecision !=
+        if (DataTypes.isDecimal(oldDatatype) &&
+            (oldDatatype.asInstanceOf[DecimalType].getPrecision !=
              newColumnPrecision ||
-             oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getScale !=
+             oldDatatype.asInstanceOf[DecimalType].getScale !=
              newColumnScale)) {
           isDataTypeChange = true
         }
+        if (DataTypes.isArrayType(oldDatatype) || DataTypes.isStructType(oldDatatype)) {
+          val oldParent = oldCarbonColumn.head
+          val oldChildren = oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
+            .toList
+          AlterTableUtil.validateComplexStructure(oldChildren,
+            alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
+            alteredColumnNamesMap)
+        }
       } else {
+        if (oldDatatype.isComplexType ||
+            newDatatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
+            newDatatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT)) {
+          throw new UnsupportedOperationException(
+            "Old and new complex columns are not compatible in structure")

Review comment:
       `oldDatatype.isComplexType` only required, other conditions u can remove

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
##########
@@ -1088,15 +1089,15 @@ object CarbonParserUtil {
   private def appendParentForEachChild(field: Field, parentName: String): Field = {
     field.dataType.getOrElse("NIL") match {
       case "Array" | "Struct" | "Map" =>
-        val newChildren = field.children
-          .map(_.map(appendParentForEachChild(_, parentName + "." + field.column)))
-        field.copy(column = parentName + "." + field.column,
-          name = Some(parentName + "." + field.name.getOrElse(None)),
+        val newChildren = field.children.map(_.map(appendParentForEachChild(_,
+            parentName + CarbonCommonConstants.POINT + field.column)))
+        field.copy(column = parentName + CarbonCommonConstants.POINT + field.column,
+          name = Some(parentName + CarbonCommonConstants.POINT + field.name.getOrElse(None)),

Review comment:
       please correct the style once, (Ctrl + Shift + Alt + L)

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -143,27 +143,49 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
       val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
       // set isDataTypeChange flag
-      if (oldCarbonColumn.head.getDataType.getName
-        .equalsIgnoreCase(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) {
+      val oldDatatype = oldCarbonColumn.head.getDataType
+      val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
+      if (isColumnRename && (DataTypes.isMapType(oldDatatype) ||
+                             newDatatype.equalsIgnoreCase(CarbonCommonConstants.MAP))) {
+        throw new UnsupportedOperationException(
+          "Alter rename is unsupported for Map datatype column")

Review comment:
       please check only for maptype and throw exception at the beginning only. Always make sure to make validations before making any other operation to avoid unnecessary code executions

##########
File path: integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
##########
@@ -1063,4 +1069,108 @@ object AlterTableUtil {
       }
     }
   }
+
+  /**
+   * This method checks the structure of the old and new complex columns, and-
+   * 1. throws exception if the number of complex-levels in both columns does not match
+   * 2. throws exception if the number of children of both columns does not match
+   * 3. creates alteredColumnNamesMap: new_column_name -> datatype. Here new_column_name are those
+   *    names of the columns that are altered.
+   * These maps will later be used while altering the table schema
+   */
+  def validateComplexStructure(oldDimensionList: List[CarbonDimension],
+      newDimensionList: List[DataTypeInfo],
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String]): Unit = {
+    if (oldDimensionList == null && newDimensionList == null) {
+      throw new UnsupportedOperationException("Both old and new dimensions are null")
+    } else if (oldDimensionList == null || newDimensionList == null) {
+      throw new UnsupportedOperationException("Either the old or the new dimension is null")
+    } else if (oldDimensionList.size != newDimensionList.size) {
+      throw new UnsupportedOperationException(
+        "Number of children of old and new complex columns are not the same")
+    } else {
+      for ((newDimensionInfo, i) <- newDimensionList.zipWithIndex) {
+        val oldDimensionInfo = oldDimensionList(i)
+        val old_column_name = oldDimensionInfo.getColName.split(CarbonCommonConstants.CHAR_POINT).
+          last
+        val old_column_datatype = oldDimensionInfo.getDataType.getName
+        val new_column_name = newDimensionInfo.columnName.split(CarbonCommonConstants.CHAR_POINT).
+          last
+        val new_column_datatype = newDimensionInfo.dataType
+        if (!old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
+          /**
+           * datatypes of complex children cannot be altered. So throwing exception for now.
+           * TODO: use alteredColumnDatatypesMap to update the carbon schema
+           */
+          throw new UnsupportedOperationException(

Review comment:
       please use `//` for comment

##########
File path: integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
##########
@@ -357,18 +358,23 @@ object AlterTableUtil {
   /**
    * This method create a new SchemaEvolutionEntry and adds to SchemaEvolutionEntry List
    *
-   * @param addColumnSchema          added new column schema
-   * @param deletedColumnSchema      old column schema which is deleted
+   * @param addedColumnsList    list of added column schemas
+   * @param deletedColumnsList  list of deleted column schemas
    * @return
    */
   def addNewSchemaEvolutionEntry(
-      timeStamp: Long,
-      addColumnSchema: org.apache.carbondata.format.ColumnSchema,
-      deletedColumnSchema: org.apache.carbondata.format.ColumnSchema): SchemaEvolutionEntry = {
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-    schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-    schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-    schemaEvolutionEntry
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      addedColumnsList: List[org.apache.carbondata.format.ColumnSchema],
+      deletedColumnsList: List[org.apache.carbondata.format.ColumnSchema]): SchemaEvolutionEntry = {
+    val timeStamp = System.currentTimeMillis()
+    var newSchemaEvolutionEntry = schemaEvolutionEntry;
+    if (newSchemaEvolutionEntry == null) {
+      newSchemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+    }
+    newSchemaEvolutionEntry.setTime_stamp(timeStamp)

Review comment:
       dont change the timestamp, add to entry only once and change the if condition at line 371, don't override the variable

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
##########
@@ -178,35 +200,51 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
             }
         }
       }
-      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+
+      // read the latest schema file
+      val tableInfo: TableInfo = metaStore.getThriftTableInfo(carbonTable)
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+      if (!alteredColumnNamesMap.isEmpty) {
         // validate the columns to be renamed
-        validColumnsForRenaming(carbonColumns, oldCarbonColumn.head, carbonTable)
+        validColumnsForRenaming(columnSchemaList, alteredColumnNamesMap, carbonTable)

Review comment:
       do not use thrift object here, please get the list of columns from carbon table object




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]