GitHub user sounakr opened a pull request:
https://github.com/apache/carbondata/pull/2276 [CARBONDATA-2443][WIP][SDK]Multi level complex type support for AVRO based SDK Multi level complex type support for AVRO based SDK - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sounakr/incubator-carbondata multi_level_complex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2276 ---- commit 29ac4474d28ccb08c9a5cff52e973da2dbbd5c4b Author: kumarvishal09 <kumarvishal1802@...> Date: 2018-05-03T15:29:18Z Fixed No dictionary complex type issues commit f357202cd897e004c41d378d764247008717e182 Author: sounakr <sounakr@...> Date: 2018-05-02T15:59:57Z [CARBONDATA-2430] Fixes Related to Complex Type Support with No Dictionary a) Reshuffling of fields. Sorting of columns based on SORT -> DIMENSION -> COMPLEX -> MEASURE. b) Enable Complex Type NoDictionary Creation from Create Table DDL. c) ARRAY Support in AVRO. d) Minor Fixes related to AVRO SDK support. commit e8cbdc5ef0d7634fc60a156fb8049567fdc6e9ec Author: ajantha-bhat <ajanthabhat@...> Date: 2018-05-04T11:35:51Z sdk complex type issue fixes and test case additions commit 1867b600b40ef7e4ef4e8cde28f31dc32865dd07 Author: ajantha-bhat <ajanthabhat@...> Date: 2018-05-04T16:57:26Z Struct of array test case commit 5bf8067f1f4c583f14e5af3ef25a794af6b1ff1c Author: sounakr <sounakr@...> Date: 2018-05-07T01:21:54Z Multi Level Complex type Support For AVRO Based SDK ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4516/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5676/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2276 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4753/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186336351 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java --- @@ -76,6 +78,19 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] s } } + private AbstractDataLoadProcessorStep buildInternalForAvroSDKLoad(CarbonIterator[] inputIterators, --- End diff -- THis logic is same as `buildInternalForPartitionLoad` then why special method and handling required? why not use same partition flow? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186336455 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -202,6 +202,13 @@ */ private boolean isPartitionLoad; + /** + * It directly writes data directly to nosort processor bypassing convert steps. + * This data comes from SDK where the input is from AVRO and AVRO Writer already + * converts the AVRO data to the Primitive datatypes object. + */ + private boolean isAvroSDKLoad; --- End diff -- I don't think this is required, use partition flow --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186338238 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java --- @@ -220,32 +241,97 @@ private CarbonRowBatch getBatch() { CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); int count = 0; while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); + carbonRowBatch.addRow( + new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields))); count++; } rowCounter.getAndAdd(carbonRowBatch.getSize()); return carbonRowBatch; } - private Object[] convertToNoDictionaryToBytes(Object[] data) { + private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { Object[] newData = new Object[data.length]; - for (int i = 0; i < noDictionaryMapping.length; i++) { - if (noDictionaryMapping[i]) { + for (int i = 0; i < data.length; i++) { + if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { newData[i] = DataTypeUtil .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); } else { - newData[i] = data[orderOfData[i]]; + // if this is a complex column then recursively comver the data into Byte Array. + if (dataTypes[i].isComplexType()) { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + getBytesForComplex(data[orderOfData[i]], dataFields[i], dataOutputStream); + dataOutputStream.close(); + newData[i] = byteArray.toByteArray(); + } catch (Exception e) { + throw new CarbonDataLoadingException( "Loading Exception", e); + } + } else { + newData[i] = data[orderOfData[i]]; + } } } - if (newData.length > noDictionaryMapping.length) { - for (int i = noDictionaryMapping.length; i < newData.length; i++) { - newData[i] = data[orderOfData[i]]; + System.out.println(Arrays.toString(data)); + return newData; + } + + private void getBytesForComplex(Object datum, DataField dataField, + DataOutputStream dataOutputStream) throws Exception { + getBytesForComplex(datum, dataField.getColumn().getDataType(), + ((CarbonDimension) dataField.getColumn()), dataOutputStream); + } + + private void getBytesForComplex(Object datum, DataType dataType, + CarbonDimension carbonDimensions, DataOutputStream dataOutputStream) throws Exception { + if (dataType.getName().equalsIgnoreCase("struct")) { + List<CarbonDimension> childStructDimensions = carbonDimensions.getListOfChildDimensions(); + int size = childStructDimensions.size(); + Object[] structFields = ((StructObject) datum).getData(); + dataOutputStream.writeInt(size); + // loop through each one of them. + for (int i = 0; i < size; i++) { + getBytesForComplex(structFields[i], childStructDimensions.get(i).getDataType(), + childStructDimensions.get(i), dataOutputStream); + } + } else if (dataType.getName().equalsIgnoreCase("array")) { --- End diff -- check `instance of` rather than string comparision --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186338913 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java --- @@ -220,32 +241,97 @@ private CarbonRowBatch getBatch() { CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); int count = 0; while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); + carbonRowBatch.addRow( + new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields))); count++; } rowCounter.getAndAdd(carbonRowBatch.getSize()); return carbonRowBatch; } - private Object[] convertToNoDictionaryToBytes(Object[] data) { + private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { Object[] newData = new Object[data.length]; - for (int i = 0; i < noDictionaryMapping.length; i++) { - if (noDictionaryMapping[i]) { + for (int i = 0; i < data.length; i++) { + if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { newData[i] = DataTypeUtil .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); } else { - newData[i] = data[orderOfData[i]]; + // if this is a complex column then recursively comver the data into Byte Array. + if (dataTypes[i].isComplexType()) { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + getBytesForComplex(data[orderOfData[i]], dataFields[i], dataOutputStream); + dataOutputStream.close(); + newData[i] = byteArray.toByteArray(); + } catch (Exception e) { + throw new CarbonDataLoadingException( "Loading Exception", e); + } + } else { + newData[i] = data[orderOfData[i]]; + } } } - if (newData.length > noDictionaryMapping.length) { - for (int i = noDictionaryMapping.length; i < newData.length; i++) { - newData[i] = data[orderOfData[i]]; + System.out.println(Arrays.toString(data)); + return newData; + } + + private void getBytesForComplex(Object datum, DataField dataField, + DataOutputStream dataOutputStream) throws Exception { + getBytesForComplex(datum, dataField.getColumn().getDataType(), + ((CarbonDimension) dataField.getColumn()), dataOutputStream); + } + + private void getBytesForComplex(Object datum, DataType dataType, --- End diff -- You should use `GenericDataType` to write the data. Don't duplicate code --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4531/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on the issue:
https://github.com/apache/carbondata/pull/2276 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5691/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5695/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4535/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186388685 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java --- @@ -220,32 +241,97 @@ private CarbonRowBatch getBatch() { CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize); int count = 0; while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next()))); + carbonRowBatch.addRow( + new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields))); count++; } rowCounter.getAndAdd(carbonRowBatch.getSize()); return carbonRowBatch; } - private Object[] convertToNoDictionaryToBytes(Object[] data) { + private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { Object[] newData = new Object[data.length]; - for (int i = 0; i < noDictionaryMapping.length; i++) { - if (noDictionaryMapping[i]) { + for (int i = 0; i < data.length; i++) { + if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { newData[i] = DataTypeUtil .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]); } else { - newData[i] = data[orderOfData[i]]; + // if this is a complex column then recursively comver the data into Byte Array. + if (dataTypes[i].isComplexType()) { + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + getBytesForComplex(data[orderOfData[i]], dataFields[i], dataOutputStream); + dataOutputStream.close(); + newData[i] = byteArray.toByteArray(); + } catch (Exception e) { + throw new CarbonDataLoadingException( "Loading Exception", e); + } + } else { + newData[i] = data[orderOfData[i]]; + } } } - if (newData.length > noDictionaryMapping.length) { - for (int i = noDictionaryMapping.length; i < newData.length; i++) { - newData[i] = data[orderOfData[i]]; + System.out.println(Arrays.toString(data)); + return newData; + } + + private void getBytesForComplex(Object datum, DataField dataField, + DataOutputStream dataOutputStream) throws Exception { + getBytesForComplex(datum, dataField.getColumn().getDataType(), + ((CarbonDimension) dataField.getColumn()), dataOutputStream); + } + + private void getBytesForComplex(Object datum, DataType dataType, + CarbonDimension carbonDimensions, DataOutputStream dataOutputStream) throws Exception { + if (dataType.getName().equalsIgnoreCase("struct")) { + List<CarbonDimension> childStructDimensions = carbonDimensions.getListOfChildDimensions(); + int size = childStructDimensions.size(); + Object[] structFields = ((StructObject) datum).getData(); + dataOutputStream.writeInt(size); + // loop through each one of them. + for (int i = 0; i < size; i++) { + getBytesForComplex(structFields[i], childStructDimensions.get(i).getDataType(), + childStructDimensions.get(i), dataOutputStream); + } + } else if (dataType.getName().equalsIgnoreCase("array")) { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186388739 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java --- @@ -202,6 +202,13 @@ */ private boolean isPartitionLoad; + /** + * It directly writes data directly to nosort processor bypassing convert steps. + * This data comes from SDK where the input is from AVRO and AVRO Writer already + * converts the AVRO data to the Primitive datatypes object. + */ + private boolean isAvroSDKLoad; --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2276#discussion_r186388851 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java --- @@ -76,6 +78,19 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] s } } + private AbstractDataLoadProcessorStep buildInternalForAvroSDKLoad(CarbonIterator[] inputIterators, --- End diff -- Both of Partition and AVRO calling the same method now. --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on the issue:
https://github.com/apache/carbondata/pull/2276 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5702/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2276 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4542/ --- |
In reply to this post by qiuchenjian-2
Github user sounakr commented on the issue:
https://github.com/apache/carbondata/pull/2276 Retest this please --- |
Free forum by Nabble | Edit this page |