GitHub user NamanRastogi opened a pull request:
https://github.com/apache/carbondata/pull/3029 [CARBONDATA-3200] No-Sort compaction When the data is loaded with SORT_SCOPE as NO_SORT, and done compaction upon, the data still remains unsorted. This does not affect much in query. The major purpose of compaction, is better pack the data and improve query performance. Now, the expected behaviour of compaction is sort to the data, so that after compaction, query performance becomes better. The columns to sort upon are provided by SORT_COLUMNS. - [ ] Any interfaces changed? --> No - [ ] Any backward compatibility impacted? --> No - [ ] Document update required? -> No - [ ] Testing done - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NamanRastogi/carbondata nosort_compaction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3029 ---- commit f9e0142149ccd83a48f828bf032842b2a18ce90d Author: namanrastogi <naman.rastogi.52@...> Date: 2018-12-27T13:26:18Z Added HybridSortProcessor commit d406a9f595558f2f027a56425b0f432b534e47c8 Author: namanrastogi <naman.rastogi.52@...> Date: 2018-12-21T16:48:15Z Added flow for HybridSorterProcessor. TODO: Implement HybridSorterProcessor itself. ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2254/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2047/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10299/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244252283 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java --- @@ -105,10 +107,10 @@ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, * * @return List of Carbon iterators --- End diff -- ```suggestion * @return The key represents Carbon iterator * The value represents whether it's sorted ``` Change the description of this method --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on the issue:
https://github.com/apache/carbondata/pull/3029 Need it add a param to control whether to sort when no_sort? Maybe user only want to merge small segments to a big segment, slow data merging may be unaccepted --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on the issue:
https://github.com/apache/carbondata/pull/3029 @qiuchenjian We dont need a parameter to control to control whether to sort the data or not while compacting. The sole purpose of compaction is to improve subsequent query performance, and even though the compaction gets slow, it is doing what is intended. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244319031 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java --- @@ -126,17 +128,24 @@ public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, // for each segment get taskblock info TaskBlockInfo taskBlockInfo = taskMap.getValue(); Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet(); + // Check if block needs sorting or not + boolean sortingRequired = + CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime()) + || !CarbonCompactionUtil.isSorted(taskBlockInfo); --- End diff -- Here we are reading each and every carbondata file footer, it will impact the compaction performance. I feel we should discuss and consider adding isSort flag also to the carbonindex file to simplify it --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10325/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2071/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244331702 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -207,18 +212,34 @@ class CarbonMergerRDD[K, V]( carbonMergerMapping.campactionType, factTableName, partitionSpec) + + } else if (CarbonCompactionUtil + .anyUnsortedOrRestructuredBlocks(rawResultIteratorList, rawResultIteratorBooleanMap)) { + + LOGGER.info("HybridSortProcessor flow is selected") + processor = new HybridSortProcessor( + carbonLoadModel, + carbonTable, + segmentProperties, + carbonMergerMapping.campactionType, + factTableName, + partitionSpec, + rawResultIteratorBooleanMap) + } else { + LOGGER.info("RowResultMergerProcessor flow is selected") - processor = - new RowResultMergerProcessor( - databaseName, - factTableName, - segmentProperties, - tempStoreLoc, - carbonLoadModel, - carbonMergerMapping.campactionType, - partitionSpec) + processor = new RowResultMergerProcessor( --- End diff -- revert this change --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/3029 @NamanRastogi Please add detail comment for all the changed code --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244332001 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -166,8 +164,9 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.setTablePath(tablePath) // check for restructured block // TODO: only in case of add and drop this variable should be true - val restructuredBlockExists: Boolean = CarbonCompactionUtil - .checkIfAnyRestructuredBlockExists(segmentMapping, + val restructuredBlockExists: Boolean = + CarbonCompactionUtil.checkIfAnyRestructuredBlockExists( --- End diff -- revert this change --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3029 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2276/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/3029 @NamanRastogi Lots of code is duplicated here, Please try to unify with other compactor processor to avoid the duplication. --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/3029 @NamanRastogi You can merge HybridSorter into CompactionResultSortProcessor. For unsorted file it will be same flow, for sorted file you add one adapter(InMemorySortTempFileChunkHolder) on top of RawResultIterator which will be inline with SortTempFileChunkHolder class so Interface will be same. And In SingleThreadFinalMerger expose one method which will take List of Sorted RawResultIterator and add to record holder heap(PrirotyQueue). InMemorySortTempFileChunkHolder you have to convert Object[] to IntermediateSortTempRow in getRowMethod --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/3029 @NamanRastogi please resolve conflicts and fix all failure --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244712666 --- Diff: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/InMemorySortTempChunkHolder.java --- @@ -0,0 +1,109 @@ +package org.apache.carbondata.processing.sort.sortdata; + +import java.util.Comparator; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; + +public class InMemorySortTempChunkHolder extends SortTempFileChunkHolder { + + private final RawResultIterator rawResultIterator; + + private IntermediateSortTempRow returnRow; +// private Object[] row; + + private SegmentProperties segmentProperties; + + private CarbonColumn[] noDicAndComplexColumns; + + private TableFieldStat tableFieldStat; + private Comparator<IntermediateSortTempRow> comparator; + + public InMemorySortTempChunkHolder(RawResultIterator rawResultIterator, + SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns, SortParameters sortParameters) { + this.rawResultIterator = rawResultIterator; + this.segmentProperties = segmentProperties; + this.noDicAndComplexColumns = noDicAndComplexColumns; + + this.tableFieldStat = new TableFieldStat(sortParameters); + this.comparator = new IntermediateSortTempRowComparator( + tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType()); + } + + public void initialise() throws CarbonSortKeyAndGroupByException { + // Not required for In memory case as it will not initialize anything + throw new UnsupportedOperationException("Operation Not supported"); + } + + public void readRow() throws CarbonSortKeyAndGroupByException { + Object[] row = this.rawResultIterator.next(); + + //TODO add code to get directly Object[] Instead Of CarbonRow Object + CarbonRow carbonRow = + WriteStepRowUtil.fromMergerRow(row, segmentProperties, noDicAndComplexColumns); + Object[] data = carbonRow.getData(); + returnRow = new IntermediateSortTempRow( + (int[]) data[WriteStepRowUtil.DICTIONARY_DIMENSION], + (Object[]) data[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX], + (Object[]) data[WriteStepRowUtil.MEASURE]); + } + + public int getEntryCount() { + // this will not be used for intermediate sorting + throw new UnsupportedOperationException("Operation Not supported"); + } + + /** + * below method will be used to check whether any more records are present + * in file or not + * + * @return more row present in file + */ + public boolean hasNext() { + return this.rawResultIterator.hasNext(); + } + + @Override public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override public int hashCode() { + int hash = rawResultIterator.hashCode(); + hash += segmentProperties.hashCode(); + return hash; + } + + + @Override public int compareTo(SortTempFileChunkHolder other) { + return comparator.compare(returnRow, other.getRow()); + } + + /** + * Below method will be used to close streams + */ + public void closeStream() { + rawResultIterator.close(); + } + + /* below method will be used to get the sort temp row + * + * @return row + */ + public IntermediateSortTempRow getRow() { +// //TODO add code to get directly Object[] Instead Of CarbonRow Object --- End diff -- remove this commented code --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244713354 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java --- @@ -400,24 +417,53 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { * @param tableLastUpdatedTime * @return */ - public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, - Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { - boolean restructuredBlockExists = false; - for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { - String segmentId = taskMap.getKey(); + public static boolean checkIfAnyRestructuredBlockExists( + Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, + long tableLastUpdatedTime) { + + for (Map.Entry<String, TaskBlockInfo> segmentEntry : segmentMapping.entrySet()) { + String segmentId = segmentEntry.getKey(); List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); - for (DataFileFooter dataFileFooter : listMetadata) { - // if schema modified timestamp is greater than footer stored schema timestamp, - // it indicates it is a restructured block - if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { - restructuredBlockExists = true; - break; - } + + if (isRestructured(listMetadata, tableLastUpdatedTime)) { + return true; } - if (restructuredBlockExists) { - break; + } + + return false; + } + + public static boolean isRestructured(List<DataFileFooter> listMetadata, + long tableLastUpdatedTime) { + /* + * TODO: only in case of add and drop this variable should be true + */ + for (DataFileFooter dataFileFooter : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, + // it indicates it is a restructured block + if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { + return true; } } - return restructuredBlockExists; + return false; } + + public static boolean isSorted(TaskBlockInfo taskBlockInfo) throws IOException { --- End diff -- Use CarbonUtil.readMetadataFile(tableblockInfo) for reading file footer ...if carbondata file version is V1/V2 this code will not able to read the file footer --- |
In reply to this post by qiuchenjian-2
Github user NamanRastogi commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3029#discussion_r244713711 --- Diff: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/InMemorySortTempChunkHolder.java --- @@ -0,0 +1,109 @@ +package org.apache.carbondata.processing.sort.sortdata; + +import java.util.Comparator; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; + +public class InMemorySortTempChunkHolder extends SortTempFileChunkHolder { + + private final RawResultIterator rawResultIterator; + + private IntermediateSortTempRow returnRow; +// private Object[] row; + + private SegmentProperties segmentProperties; + + private CarbonColumn[] noDicAndComplexColumns; + + private TableFieldStat tableFieldStat; + private Comparator<IntermediateSortTempRow> comparator; + + public InMemorySortTempChunkHolder(RawResultIterator rawResultIterator, + SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns, SortParameters sortParameters) { + this.rawResultIterator = rawResultIterator; + this.segmentProperties = segmentProperties; + this.noDicAndComplexColumns = noDicAndComplexColumns; + + this.tableFieldStat = new TableFieldStat(sortParameters); + this.comparator = new IntermediateSortTempRowComparator( + tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType()); + } + + public void initialise() throws CarbonSortKeyAndGroupByException { + // Not required for In memory case as it will not initialize anything + throw new UnsupportedOperationException("Operation Not supported"); + } + + public void readRow() throws CarbonSortKeyAndGroupByException { + Object[] row = this.rawResultIterator.next(); + + //TODO add code to get directly Object[] Instead Of CarbonRow Object + CarbonRow carbonRow = + WriteStepRowUtil.fromMergerRow(row, segmentProperties, noDicAndComplexColumns); + Object[] data = carbonRow.getData(); + returnRow = new IntermediateSortTempRow( + (int[]) data[WriteStepRowUtil.DICTIONARY_DIMENSION], + (Object[]) data[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX], + (Object[]) data[WriteStepRowUtil.MEASURE]); + } + + public int getEntryCount() { + // this will not be used for intermediate sorting + throw new UnsupportedOperationException("Operation Not supported"); + } + + /** + * below method will be used to check whether any more records are present + * in file or not + * + * @return more row present in file + */ + public boolean hasNext() { + return this.rawResultIterator.hasNext(); + } + + @Override public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override public int hashCode() { + int hash = rawResultIterator.hashCode(); + hash += segmentProperties.hashCode(); + return hash; + } + + + @Override public int compareTo(SortTempFileChunkHolder other) { + return comparator.compare(returnRow, other.getRow()); + } + + /** + * Below method will be used to close streams + */ + public void closeStream() { + rawResultIterator.close(); + } + + /* below method will be used to get the sort temp row + * + * @return row + */ + public IntermediateSortTempRow getRow() { +// //TODO add code to get directly Object[] Instead Of CarbonRow Object --- End diff -- Done. --- |
Free forum by Nabble | Edit this page |