GitHub user xuchuanyin opened a pull request:
https://github.com/apache/carbondata/pull/2133 [CARBONDATA-2304][Compaction] Prefetch rowbatch during compaction Add a configuration to enable prefetch during compaction. During compaction, carbondata will query on the segments and retrieve a rowï¼ then it will sort the rows and produce the final carbondata file. Currently we find the poor performance in retrieving the rows, so adding prefetch for the rows will surely improve the compaction performance. In my local tests, compacting 4 segments each with 100 thousand rows costs 30s with prefetch and 50s without prefetch. In my tests in a larger cluster, compacting 6 segments each with 18GB raw data costs 45min with prefetch and 57min without prefetch. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [x] Any interfaces changed? `NO` - [x] Any backward compatibility impacted? `NO` - [x] Document update required? `Add a configuration, will update it later` - [x] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? `Yes` - How it is tested? Please attach test report. `Tested in local and a 3-node cluster` - Is it a performance related change? Please attach the performance test report. `Compaction performance has been enhanced by 25+%` - Any additional information to help reviewers in testing this change. - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. `Not related` You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata 0402_compaction_prefetch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2133 ---- commit 41869effa326052b46088f68dd1d6ccc5f7525e5 Author: xuchuanyin <xuchuanyin@...> Date: 2018-04-02T12:38:17Z Prefetch rowbatch during compaction Add a configuration to enable prefetch during compaction. ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4765/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3537/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2133 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4270/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4772/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2133 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4275/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3544/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4854/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3632/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2133 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4323/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2133#discussion_r181135852 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1642,6 +1642,14 @@ public static final String CARBON_SEARCH_MODE_THREAD_DEFAULT = "3"; + /* + * whether to enable prefetch during compaction --- End diff -- Can you describe more on what is prefetched --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2133#discussion_r181136505 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -39,106 +53,131 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; - - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(RawResultIterator.class.getName()); - - /** - * batch of the result. - */ - private RowBatch batch; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandOff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandOff) { + init(); + } } - @Override public boolean hasNext() { + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error(e, "Error occurs while fetching records"); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } + } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + private List<Object[]> fetchRows() { + if (detailRawQueryResultIterator.hasNext()) { + return detailRawQueryResultIterator.next().getRows(); } else { - return false; + return new ArrayList<>(); } } - @Override public Object[] next() { - if (null == batch) { // for 1st time - batch = detailRawQueryResultIterator.next(); - } - if (!checkIfBatchIsProcessedCompletely(batch)) { - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; + private void fillDataFromPrefetch() { + try { + if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { + if (prefetchEnabled) { + if (!isBackupFilled) { + fetchFuture.get(); + } + // copy backup buffer to current buffer and fill backup buffer asyn + currentIdxInBuffer = 0; + currentBuffer = backupBuffer; + isBackupFilled = false; + fetchFuture = executorService.submit(new RowsFetcher(true)); + } else { + currentIdxInBuffer = 0; + new RowsFetcher(false).call(); } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; } - } else { // completed one batch. - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; + } catch (Exception e) { + throw new RuntimeException(e); } - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; - } + } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; + private void popRow() { --- End diff -- please add comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2133#discussion_r181136264 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -39,106 +53,131 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; - - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(RawResultIterator.class.getName()); - - /** - * batch of the result. - */ - private RowBatch batch; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandOff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandOff) { + init(); + } } - @Override public boolean hasNext() { + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error(e, "Error occurs while fetching records"); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } + } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + private List<Object[]> fetchRows() { + if (detailRawQueryResultIterator.hasNext()) { + return detailRawQueryResultIterator.next().getRows(); } else { - return false; + return new ArrayList<>(); } } - @Override public Object[] next() { - if (null == batch) { // for 1st time - batch = detailRawQueryResultIterator.next(); - } - if (!checkIfBatchIsProcessedCompletely(batch)) { - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; + private void fillDataFromPrefetch() { + try { + if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { + if (prefetchEnabled) { + if (!isBackupFilled) { + fetchFuture.get(); + } + // copy backup buffer to current buffer and fill backup buffer asyn + currentIdxInBuffer = 0; + currentBuffer = backupBuffer; + isBackupFilled = false; + fetchFuture = executorService.submit(new RowsFetcher(true)); + } else { + currentIdxInBuffer = 0; + new RowsFetcher(false).call(); } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; } - } else { // completed one batch. - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; + } catch (Exception e) { + throw new RuntimeException(e); } - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; - } + } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; + private void popRow() { + fillDataFromPrefetch(); + currentRawRow = currentBuffer.get(currentIdxInBuffer); + currentIdxInBuffer++; + } + + private void pickRow() { + fillDataFromPrefetch(); + currentRawRow = currentBuffer.get(currentIdxInBuffer); + } + + @Override public boolean hasNext() { --- End diff -- move @Override to previous line --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2133 @jackylk reviews have been fixed --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3868/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2133 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5094/ --- |
Free forum by Nabble | Edit this page |