Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244714375 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java --- @@ -400,24 +417,53 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { * @param tableLastUpdatedTime * @return */ - public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, - Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { - boolean restructuredBlockExists = false; - for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { - String segmentId = taskMap.getKey(); + public static boolean checkIfAnyRestructuredBlockExists( + Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, + long tableLastUpdatedTime) { + + for (Map.Entry<String, TaskBlockInfo> segmentEntry : segmentMapping.entrySet()) { + String segmentId = segmentEntry.getKey(); List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); - for (DataFileFooter dataFileFooter : listMetadata) { - // if schema modified timestamp is greater than footer stored schema timestamp, - // it indicates it is a restructured block - if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { - restructuredBlockExists = true; - break; - } + + if (isRestructured(listMetadata, tableLastUpdatedTime)) { + return true; } - if (restructuredBlockExists) { - break; + } + + return false; + } + + public static boolean isRestructured(List<DataFileFooter> listMetadata, + long tableLastUpdatedTime) { + /* + * TODO: only in case of add and drop this variable should be true + */ + for (DataFileFooter dataFileFooter : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, + // it indicates it is a restructured block + if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { + return true; } } - return restructuredBlockExists; + return false; } + + public static boolean isSorted(TaskBlockInfo taskBlockInfo) throws IOException { + String filePath = + taskBlockInfo.getAllTableBlockInfoList().iterator().next().get(0).getFilePath(); + long fileSize = + FileFactory.getCarbonFile(filePath, FileFactory.getFileType(filePath)).getSize(); + + FileReader fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)); + ByteBuffer buffer = + fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(filePath), fileSize - 8, 8); + fileReader.finish(); + + CarbonFooterReaderV3 footerReader = new CarbonFooterReaderV3(filePath, buffer.getLong()); + FileFooter3 footer = footerReader.readFooterVersion3(); + + return footer.isIs_sort(); --- End diff -- Now during compaction it will read file footer twice which will impact the compaction perfornace ....so for this expose get/set method in TableBlockInfo which will store filefooter....and in AbstractQueryExecutor.java add a check if filefooter is present then no need to read again please find the code reference org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java:217 --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2328/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2120/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244721762 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala --- @@ -118,7 +118,7 @@ class StreamHandoffRDD[K, V]( CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false) // use CompactionResultSortProcessor to sort data dan write to columnar files val processor = prepareHandoffProcessor(carbonTable) - val status = processor.execute(iteratorList) + val status = processor.execute(iteratorList, null) --- End diff -- Please add comment why need to pass null --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244721944 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java --- @@ -105,10 +105,15 @@ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, * * @return List of Carbon iterators --- End diff -- add comment for map type what output this method will return --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244722390 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java --- @@ -400,24 +417,53 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { * @param tableLastUpdatedTime * @return */ - public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, - Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { - boolean restructuredBlockExists = false; - for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { - String segmentId = taskMap.getKey(); + public static boolean checkIfAnyRestructuredBlockExists( + Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, + long tableLastUpdatedTime) { + + for (Map.Entry<String, TaskBlockInfo> segmentEntry : segmentMapping.entrySet()) { + String segmentId = segmentEntry.getKey(); List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); - for (DataFileFooter dataFileFooter : listMetadata) { - // if schema modified timestamp is greater than footer stored schema timestamp, - // it indicates it is a restructured block - if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { - restructuredBlockExists = true; - break; - } + + if (isRestructured(listMetadata, tableLastUpdatedTime)) { + return true; } - if (restructuredBlockExists) { - break; + } + + return false; + } + + public static boolean isRestructured(List<DataFileFooter> listMetadata, --- End diff -- Please add comments for all public method which you have exposed --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244722477 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -156,21 +159,22 @@ public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTabl * This method will iterate over the query result and convert it into a format compatible * for data loading * - * @param resultIteratorList + * @param unsortedResultIteratorList --- End diff -- update comment --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244723649 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -103,15 +104,22 @@ private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) * Merge function * */ - public boolean execute(List<RawResultIterator> resultIteratorList) throws Exception { - initRecordHolderHeap(resultIteratorList); + public boolean execute(List<RawResultIterator> unsortedResultIteratorList, + List<RawResultIterator> sortedResultIteratorList) throws Exception { + + if (unsortedResultIteratorList != null && unsortedResultIteratorList.size() > 0) { --- End diff -- No need to add this if check because if unsortedResultIteratorList != null && unsortedResultIteratorList.size() > 0 condition is true and RowResultMergerResultProcessor is getting called then problem is there in the caller class, this class responsibility is to handle only sorted without restructured segments --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10376/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2123/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2127/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2333/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10381/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2334/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2128/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10382/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2129/ --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244766044 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala --- @@ -118,7 +118,7 @@ class StreamHandoffRDD[K, V]( CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false) // use CompactionResultSortProcessor to sort data dan write to columnar files val processor = prepareHandoffProcessor(carbonTable) - val status = processor.execute(iteratorList) + val status = processor.execute(iteratorList, null) --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244766765 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -156,21 +159,22 @@ public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTabl * This method will iterate over the query result and convert it into a format compatible * for data loading * - * @param resultIteratorList + * @param unsortedResultIteratorList --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244766555 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java --- @@ -400,24 +417,53 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { * @param tableLastUpdatedTime * @return */ - public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, - Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { - boolean restructuredBlockExists = false; - for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { - String segmentId = taskMap.getKey(); + public static boolean checkIfAnyRestructuredBlockExists( + Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, + long tableLastUpdatedTime) { + + for (Map.Entry<String, TaskBlockInfo> segmentEntry : segmentMapping.entrySet()) { + String segmentId = segmentEntry.getKey(); List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); - for (DataFileFooter dataFileFooter : listMetadata) { - // if schema modified timestamp is greater than footer stored schema timestamp, - // it indicates it is a restructured block - if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { - restructuredBlockExists = true; - break; - } + + if (isRestructured(listMetadata, tableLastUpdatedTime)) { + return true; } - if (restructuredBlockExists) { - break; + } + + return false; + } + + public static boolean isRestructured(List<DataFileFooter> listMetadata, --- End diff -- Done --- |
Free forum by Nabble | Edit this page |