Login  Register

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4142: [CARBONDATA-4193] Fix compaction failure after alter add complex column.

Posted by GitBox on Jun 01, 2021; 1:32pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-4142-WIP-Fix-compaction-failure-after-alter-tp108434p108506.html


akashrn5 commented on a change in pull request #4142:
URL: https://github.com/apache/carbondata/pull/4142#discussion_r643095996



##########
File path: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
   }
 
   /**
-   * This method will fill the no dictionary byte array with newly added no dictionary columns
+   * This method will fill the no dictionary and complex byte array with newly added columns
    *
    * @param rows
    * @return
    */
-  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
+  private void fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
     for (Object[] row : rows) {
       ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
       byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
       ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
       byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
           new byte[noDictKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
+      byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+          new byte[complexTypesKeyArray.length + dimensionInfo.getNewComplexColumnCount()][];
       int existingColumnValueIndex = 0;
       int newKeyArrayIndex = 0;
+      int existingComplexColumnValueIndex = 0;
+      int newComplexKeyArrayIndex = 0;
       for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
         if (actualQueryDimensions[i].getDimension().getDataType() != DataTypes.DATE
             && !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          DataType currDataType = actualQueryDimensions[i].getDimension().getDataType();
           // if dimension exists then add the byte array value else add the default value
           if (dimensionInfo.getDimensionExists()[i]) {
-            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-                noDictKeyArray[existingColumnValueIndex++];
+            if (currDataType.isComplexType()) {
+              complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  complexTypesKeyArray[existingComplexColumnValueIndex++];
+            } else {
+              noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                  noDictKeyArray[existingColumnValueIndex++];
+            }
           } else {
             byte[] newColumnDefaultValue = null;
             Object defaultValue = dimensionInfo.getDefaultValues()[i];
             if (null != defaultValue) {
               newColumnDefaultValue = (byte[]) defaultValue;
-            } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
+            } else if (currDataType == DataTypes.STRING) {
               newColumnDefaultValue =
                   DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else if (currDataType.isComplexType()) {
+              // Iterate over child dimensions and add its default value.
+              List<CarbonDimension> children =
+                  actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+              ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+              DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
+              try {
+                if (DataTypes.isArrayType(currDataType)) {
+                  dataOutputStream.writeInt(1);
+                } else if (DataTypes.isStructType(currDataType)) {
+                  dataOutputStream.writeShort(children.size());
+                }
+                for (int j = 0; j < children.size(); j++) {
+                  // update default null values based on datatype
+                  updateNullValue(dataOutputStream, children.get(j).getDataType());
+                }
+                dataOutputStream.close();
+              } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+                e.printStackTrace();

Review comment:
       please remove the printStackTrace

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -111,36 +123,72 @@ private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
   }
 
   /**
-   * This method will fill the no dictionary byte array with newly added no dictionary columns
+   * This method will fill the no dictionary and complex byte array with newly added columns
    *
    * @param rows
    * @return
    */
-  private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
+  private void fillNoDictionaryAndComplexKeyArrayBatchWithLatestSchema(List<Object[]> rows) {
     for (Object[] row : rows) {
       ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0];
       byte[][] noDictKeyArray = byteArrayWrapper.getNoDictionaryKeys();
+      byte[][] complexTypesKeyArray = byteArrayWrapper.getComplexTypesKeys();
       ProjectionDimension[] actualQueryDimensions = executionInfo.getActualQueryDimensions();
       byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
           new byte[noDictKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
+      byte[][] complexTypeKeyArrayWithNewlyAddedColumns =
+          new byte[complexTypesKeyArray.length + dimensionInfo.getNewComplexColumnCount()][];
       int existingColumnValueIndex = 0;
       int newKeyArrayIndex = 0;
+      int existingComplexColumnValueIndex = 0;
+      int newComplexKeyArrayIndex = 0;
       for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
         if (actualQueryDimensions[i].getDimension().getDataType() != DataTypes.DATE
             && !actualQueryDimensions[i].getDimension().hasEncoding(Encoding.IMPLICIT)) {
+          DataType currDataType = actualQueryDimensions[i].getDimension().getDataType();
           // if dimension exists then add the byte array value else add the default value
           if (dimensionInfo.getDimensionExists()[i]) {
-            noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
-                noDictKeyArray[existingColumnValueIndex++];
+            if (currDataType.isComplexType()) {
+              complexTypeKeyArrayWithNewlyAddedColumns[newComplexKeyArrayIndex++] =
+                  complexTypesKeyArray[existingComplexColumnValueIndex++];
+            } else {
+              noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+                  noDictKeyArray[existingColumnValueIndex++];
+            }
           } else {
             byte[] newColumnDefaultValue = null;
             Object defaultValue = dimensionInfo.getDefaultValues()[i];
             if (null != defaultValue) {
               newColumnDefaultValue = (byte[]) defaultValue;
-            } else if (actualQueryDimensions[i].getDimension().getDataType() == DataTypes.STRING) {
+            } else if (currDataType == DataTypes.STRING) {
               newColumnDefaultValue =
                   DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+            } else if (currDataType.isComplexType()) {
+              // Iterate over child dimensions and add its default value.
+              List<CarbonDimension> children =
+                  actualQueryDimensions[i].getDimension().getListOfChildDimensions();
+              ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+              DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
+              try {
+                if (DataTypes.isArrayType(currDataType)) {
+                  dataOutputStream.writeInt(1);
+                } else if (DataTypes.isStructType(currDataType)) {
+                  dataOutputStream.writeShort(children.size());
+                }
+                for (int j = 0; j < children.size(); j++) {
+                  // update default null values based on datatype
+                  updateNullValue(dataOutputStream, children.get(j).getDataType());
+                }
+                dataOutputStream.close();
+              } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+                e.printStackTrace();
+              }
+              newColumnDefaultValue = byteStream.toByteArray();

Review comment:
       please close all the streams after use, either use try with resource or close in finally block

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
##########
@@ -140,6 +141,9 @@
           if (queryDimension.getDimension().getDataType() == DataTypes.DATE) {
             dimensionInfo.setDictionaryColumnAdded(true);
             newDictionaryColumnCount++;
+          } else if (queryDimension.getDimension().getDataType().isComplexType()) {
+            dimensionInfo.setComplexColumnAdded(true);
+            newComplexDictColumnCount++;

Review comment:
       can rename to no dict column count

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
##########
@@ -149,6 +197,27 @@ private void fillNoDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows)
         }
       }
       byteArrayWrapper.setNoDictionaryKeys(noDictionaryKeyArrayWithNewlyAddedColumns);
+      byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
+    }
+  }
+
+  private void updateNullValue(DataOutputStream dataOutputStream, DataType dimensionDataType)
+      throws IOException {
+    if (dimensionDataType == DataTypes.STRING

Review comment:
       please refactor this code and put in some core module util and use in both core and processing module




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]