akashrn5 commented on a change in pull request #3865: URL: https://github.com/apache/carbondata/pull/3865#discussion_r469029220 ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')") - intercept[Exception] { - sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() - } + checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3))) + // String whilch length greater than 32000 will be considered as bad record and will be inserted as null in table Review comment: ```suggestion // String which length greater than 32000 will be considered as bad record and will be inserted as null in table ``` ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2456,4 +2456,10 @@ private CarbonCommonConstants() { * property which defines the insert stage flow */ public static final String IS_INSERT_STAGE = "is_insert_stage"; + + public static final String STRING_LENGTH_EXCEEDED_MESSAGE = + "Record %s of column %s exceeded " + MAX_CHARS_PER_COLUMN_DEFAULT + + " bytes. Please consider long string data type."; Review comment: ```suggestion " characters. Please consider long string data type."; ``` ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -20,17 +20,16 @@ package org.apache.carbondata.integration.spark.testsuite.dataload import java.math.BigDecimal import scala.collection.mutable.ArrayBuffer - Review comment: remove unnecessary changes if not required ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -20,17 +20,16 @@ package org.apache.carbondata.integration.spark.testsuite.dataload import java.math.BigDecimal import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterEach - Review comment: same as above ########## File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ########## @@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream dataOutputStream, } } + private byte[] getNullForBytes(byte[] value) { Review comment: i think this is not required, can reuse `updateNullValue`, please check ########## File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ########## @@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream dataOutputStream, } } + private byte[] getNullForBytes(byte[] value) { + String badRecordAction = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION); + if (badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION)) { + if (this.carbonDimension.getDataType() == DataTypes.STRING) { + return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + return CarbonCommonConstants.EMPTY_BYTE_ARRAY; + } + } + return value; + } + private void checkAndWriteByteArray(Object input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder, Boolean isWithoutConverter, String parsedValue, byte[] value) throws IOException { if (isWithoutConverter) { if (this.carbonDimension.getDataType() == DataTypes.STRING && input instanceof String && ((String)input).length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE, + input.toString(), this.carbonDimension.getColName())); + value = getNullForBytes(value); } updateValueToByteStream(dataOutputStream, value); } else { if (this.carbonDimension.getDataType() == DataTypes.STRING && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE, Review comment: i think here after updating bad record reason, you should update null value to stream ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')") - intercept[Exception] { - sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() - } + checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3))) + // String whilch length greater than 32000 will be considered as bad record and will be inserted as null in table + sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(3))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3))) sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata_dup OPTIONS('FILEHEADER'='dim1,dim2,mes1')") + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + // Update strings of length greater than 32000 will invalidate the whole row. + sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select concat(load32000chardata.dim2,'aaaa') " + + "from load32000chardata where load32000chardata.mes1=3) where load32000chardata_dup.mes1=3").show() + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3), Row("32000", null, 3))) + + val longChar: String = RandomStringUtils.randomAlphabetic(33000) + // BAD_RECORD_ACTION = "REDIRECT" + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT"); + sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)") + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3), Row("32000", null, 3))) + + // BAD_RECORD_ACTION = "IGNORE" + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "IGNORE"); + sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)") + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3), Row("32000", null, 3))) + + // BAD_RECORD_ACTION = "FAIL" + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); intercept[Exception] { - sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select concat(load32000chardata.dim2,'aaaa') from load32000chardata)").show() + sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)") } + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3), Row("32000", null, 3))) + CarbonProperties.getInstance() Review comment: you should change property anywhere ,better to do in beforeall and reset in afterall, please follow this. ########## File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ########## @@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream dataOutputStream, } } + private byte[] getNullForBytes(byte[] value) { + String badRecordAction = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION); + if (badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION)) { + if (this.carbonDimension.getDataType() == DataTypes.STRING) { + return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + return CarbonCommonConstants.EMPTY_BYTE_ARRAY; + } + } + return value; + } + private void checkAndWriteByteArray(Object input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder, Boolean isWithoutConverter, String parsedValue, byte[] value) throws IOException { if (isWithoutConverter) { if (this.carbonDimension.getDataType() == DataTypes.STRING && input instanceof String && ((String)input).length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE, + input.toString(), this.carbonDimension.getColName())); + value = getNullForBytes(value); Review comment: same as above ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')") - intercept[Exception] { - sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() Review comment: here i cannot see any bad record action so by default it should be fail, why its force, please check and unset if some previous test case failed to do so ########## File path: integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ########## @@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata") sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')") - intercept[Exception] { - sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() - } + checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3))) + // String whilch length greater than 32000 will be considered as bad record and will be inserted as null in table + sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show() + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(3))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3))) sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata_dup OPTIONS('FILEHEADER'='dim1,dim2,mes1')") + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + // Update strings of length greater than 32000 will invalidate the whole row. + sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select concat(load32000chardata.dim2,'aaaa') " + + "from load32000chardata where load32000chardata.mes1=3) where load32000chardata_dup.mes1=3").show() + checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6))) + checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), Seq(Row("32000", null, 3), Row("32000", null, 3))) + + val longChar: String = RandomStringUtils.randomAlphabetic(33000) + // BAD_RECORD_ACTION = "REDIRECT" + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT"); Review comment: for redirect please check the redirect value for assert ########## File path: processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ########## @@ -301,8 +301,9 @@ public void writeByteArray(Object input, DataOutputStream dataOutputStream, } if (this.carbonDimension.getDataType() == DataTypes.STRING && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE, Review comment: i think can reuse `updateNullValue` method ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala ########## @@ -194,11 +194,10 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi // query should pass checkAnswer(sql("select * from testlongstring"), Seq(Row(1, "ab", "cool"), Row(1, "ab1", longChar), Row(1, "abc", longChar))) - // insert long string should fail as unset is done Review comment: this change is not required, please find who is setting as force and forgot to unset ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java ########## @@ -82,21 +83,25 @@ public Object convert(Object value, BadRecordLogHolder logHolder) .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); if (dataType == DataTypes.STRING && parsedValue.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException(String.format( - "Dataload failed, String size cannot exceed %d bytes," - + " please consider long string data type", - CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)); + logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE); + String badRecordAction = CarbonProperties.getInstance() Review comment: here no need to check the bad records action, just return `null` value for it should handle in `rowConverterImpl` ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java ########## @@ -82,21 +83,25 @@ public Object convert(Object value, BadRecordLogHolder logHolder) .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); if (dataType == DataTypes.STRING && parsedValue.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException(String.format( - "Dataload failed, String size cannot exceed %d bytes," - + " please consider long string data type", - CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)); + logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE); + String badRecordAction = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION); + if (badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION)) { + parsedValue = getNullValue(); + } } return parsedValue; } else { Object parsedValue = DataTypeUtil .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); if (dataType == DataTypes.STRING && parsedValue.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException(String.format( - "Dataload failed, String size cannot exceed %d bytes," - + " please consider long string data type", - CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)); + logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE); + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION) Review comment: same as above, check if it handles for all the bad records action ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |