GitHub user Indhumathi27 opened a pull request:
https://github.com/apache/carbondata/pull/2671 [WIP]AVRO datatype support through SDK This PR supports following Avro DataTypes to carbon format through SDK. Avro datatypes include, 1. Avro Union 2. Avro Enum 3. Avro Logical type Decimal Please refer JIRA CARBONDATA-2876 for further detail. - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [x] Testing done Test file has been added - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Indhumathi27/carbondata avro_support_sdk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2671.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2671 ---- commit eae87521707fa86337a45d5677cc7d0f1f1fbfbc Author: Indhumathi27 <indhumathim27@...> Date: 2018-08-29T14:18:21Z Support Avro datatype conversion through SDK ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2671 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6462/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8149/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/78/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2671 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6466/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8153/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/82/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r213916136 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +218,225 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + ; + } else { + out = fieldValue; + } + break; + case UNION: + List<Schema> fieldsUnion = avroField.schema().getTypes(); + int countIfNotNull = 0; + for (Schema unionField : fieldsUnion) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + countIfNotNull++; + } + } + Object[] values; + values = new Object[countIfNotNull]; --- End diff -- 1. Rename countIfNotNull to notNullUnionFieldsCount 2. merge above 2 lines 'Object[] values = new Object[countIfNotNull];' 3. Check union behavior for only NULL type and handle if any special handling is required --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r213914935 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +218,225 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + ; --- End diff -- Remove this semi-colon --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r213922678 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +218,225 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + ; + } else { + out = fieldValue; + } + break; + case UNION: + List<Schema> fieldsUnion = avroField.schema().getTypes(); + int countIfNotNull = 0; + for (Schema unionField : fieldsUnion) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + countIfNotNull++; + } + } + Object[] values; + values = new Object[countIfNotNull]; + int j = 0; + for (Schema unionField : fieldsUnion) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); --- End diff -- 1. Try to reuse the code as much as possible. You can extract the primitives types computation in a separate method and override only complex types as different methods for union and rest of the data types 2. Write a function for union type to identify the schema for which the computation need to be done. Do not call the computation for all the union types as at one time only one type of value will exists --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r213915304 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +218,225 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + ; + } else { + out = fieldValue; --- End diff -- else case handling is not required as Binary data type is not supported...so write a proper comment to say that only decimal logical type is support for avro Byte data type. Once binary data type is supported we can add the else block --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2671 @Indhumathi27 ..please modify the code as per the comments then we can continue with further review of code --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8182/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/111/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r214024597 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -310,6 +503,31 @@ private static Field prepareFields(Schema.Field avroField) { } else { return null; } + case UNION: + int i = 0; + // Get union types and store as Struct<type> + ArrayList<StructField> unionFields = new ArrayList<>(); + for (Schema avroSubField : avroField.schema().getTypes()) { + StructField unionField = prepareSubFields(avroField.name() + i++, avroSubField); + if (unionField != null) { + unionFields.add(unionField); + } + } + if (unionFields.size() != 0) { --- End diff -- replace 'unionFields.size()' with 'unionFields.isEmpty()' --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r214022747 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +181,126 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case UNION: + // Union type will be internally stored as Struct<col:type> + // Fill data object only if fieldvalue is instance of datatype + // For other field datatypes, fill value as Null + List<Schema> unionFields = avroField.schema().getTypes(); + int notNullUnionFieldsCount = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + notNullUnionFieldsCount++; + } + } + Object[] values = new Object[notNullUnionFieldsCount]; + int j = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + if (checkFieldValueType(unionField.getType(), fieldValue)) { + values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); + } else { + values[j] = null; + } --- End diff -- 1. Remove else block 2. Combine above 2 if conditions into 1 using && operator 3. break the loop once if check is success --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r214023293 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -213,6 +181,126 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { } out = new ArrayObject(arrayChildObjects); break; + case UNION: + // Union type will be internally stored as Struct<col:type> + // Fill data object only if fieldvalue is instance of datatype + // For other field datatypes, fill value as Null + List<Schema> unionFields = avroField.schema().getTypes(); + int notNullUnionFieldsCount = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + notNullUnionFieldsCount++; + } + } + Object[] values = new Object[notNullUnionFieldsCount]; + int j = 0; + for (Schema unionField : unionFields) { + if (!unionField.getType().equals(Schema.Type.NULL)) { + if (checkFieldValueType(unionField.getType(), fieldValue)) { + values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); + } else { + values[j] = null; + } + j++; + } + } + out = new StructObject(values); + break; + default: + out = avroPrimitiveFieldToObject(type, logicalType, fieldValue); + } + return out; + } + + /** + * For Union type, fill data if Schema.Type is instance of fieldValue + * and return result + * + * @param type + * @param fieldValue + * @return + */ + private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { + switch (type) { + case INT: + return (fieldValue instanceof Integer); + case BOOLEAN: + return (fieldValue instanceof Boolean); + case LONG: + return (fieldValue instanceof Long); + case DOUBLE: + return (fieldValue instanceof Double); + case STRING: + return (fieldValue instanceof Utf8 || fieldValue instanceof String); + case FLOAT: + return (fieldValue instanceof Float); + case RECORD: + return (fieldValue instanceof GenericData.Record); + case ARRAY: + return (fieldValue instanceof GenericData.Array || fieldValue instanceof ArrayList); + case BYTES: + return (fieldValue instanceof ByteBuffer); + case MAP: + return (fieldValue instanceof HashMap); + case ENUM: + return (fieldValue instanceof GenericData.EnumSymbol); + default: + return false; + } + } + + private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType logicalType, + Object fieldValue) { + Object out; + switch (type) { + case INT: + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + int dateIntValue = (int) fieldValue; + out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case BOOLEAN: + case LONG: + if (logicalType != null && !(logicalType instanceof LogicalTypes.TimestampMillis)) { + if (logicalType instanceof LogicalTypes.TimestampMicros) { + long dateIntValue = (long) fieldValue; + out = dateIntValue / 1000L; + } else { + LOGGER.warn("Actual type: INT, Logical Type: " + logicalType.getName()); + out = fieldValue; + } + } else { + out = fieldValue; + } + break; + case DOUBLE: + case STRING: + case ENUM: + out = fieldValue; + break; + case FLOAT: + // direct conversion will change precision. So parse from string. + // also carbon internally needs float as double + out = Double.parseDouble(fieldValue.toString()); + break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + if (logicalType instanceof LogicalTypes.Decimal) { + out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + } else { + // As binary type is not supported yet, fill value as null + out = null; --- End diff -- Remove this else block --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2671#discussion_r214024187 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -310,6 +503,31 @@ private static Field prepareFields(Schema.Field avroField) { } else { return null; } + case UNION: + int i = 0; + // Get union types and store as Struct<type> + ArrayList<StructField> unionFields = new ArrayList<>(); + for (Schema avroSubField : avroField.schema().getTypes()) { + StructField unionField = prepareSubFields(avroField.name() + i++, avroSubField); --- End diff -- check for NULL schema here --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8188/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2671 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/117/ --- |
Free forum by Nabble | Edit this page |