GitHub user xuchuanyin opened a pull request:
https://github.com/apache/carbondata/pull/2906 [CARBONDATA-3088][Compaction] support prefetch for compaction Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method `addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage. To reduce the waiting time, we can prepare the 32000 records ahead. This can be achived using prefetch. We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it will be filled asynchronously. Two parameters are involved for this feature: 1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants). 2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This parameter controls whether we will prefetch the records for compation. By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description. code branch | prefetch | carbon.detail.batch.size (default 100) | time taken per load (s) | time taken to compact 3 loads (s) | Enhancement -- | -- | -- | -- | -- | -- master | NA | 100 | 447.4 | 445.9 | 450.1 | 661.3 | Baseline master | NA | 32000 | 441.5 | 454.4 | 456.8 | 641.2 | +3.0% current PR | enable | 100 | 445.3 | 450.2 | 445.3 | 411.8 | +37.7% current PR | enable | 32000 | 438.7 | 446.8 | 441.8 | 333.1 | +49.6% current PR | disable | 100 | 458.1 | 459.4 | 450.9 | 659.5 | +0.3% current PR | disable | 32000 | Â | Â | Â | Â | Â Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata 181105_opt_prefetch_compaction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2906 ---- commit 4406ee68b0b12039f3ee2a1cc8dcc4469bb639d0 Author: xuchuanyin <xuchuanyin@...> Date: 2018-11-05T07:11:09Z support prefetch for compaction Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method `addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage. To reduce the waiting time, we can prepare the 32000 records ahead. This can be achived using prefetch. We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it will be filled asynchronously. Two parameters are involved for this feature: 1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants). 2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This parameter controls whether we will prefetch the records for compation. By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description. ---- --- |
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2906 Please note that this PR is nearly the modification from PR #2133 plus that we meld the `convertRow` step to the backup buffer procedure. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231492037 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -40,103 +49,150 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; /** * LOGGER */ private static final Logger LOGGER = LogServiceFactory.getLogService(RawResultIterator.class.getName()); - /** - * batch of the result. - */ - private RowBatch batch; - public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandoff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandoff) { + init(); + } } - @Override - public boolean hasNext() { - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error("Error occurs while fetching records", e); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; + + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + } + + private List<Object[]> fetchRows() throws Exception { + List<Object[]> converted = new ArrayList<>(); + if (detailRawQueryResultIterator.hasNext()) { + for (Object[] r : detailRawQueryResultIterator.next().getRows()) { + converted.add(convertRow(r)); --- End diff -- FYI: This is the key difference with PR #2133 --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1325/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1536/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9584/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1326/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9585/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1537/ --- |
In reply to this post by qiuchenjian-2
Github user Sssan520 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231738236 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -40,103 +49,150 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; /** * LOGGER */ private static final Logger LOGGER = LogServiceFactory.getLogService(RawResultIterator.class.getName()); - /** - * batch of the result. - */ - private RowBatch batch; - public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandoff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandoff) { + init(); + } } - @Override - public boolean hasNext() { - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error("Error occurs while fetching records", e); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; + + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + } + + private List<Object[]> fetchRows() throws Exception { + List<Object[]> converted = new ArrayList<>(); + if (detailRawQueryResultIterator.hasNext()) { + for (Object[] r : detailRawQueryResultIterator.next().getRows()) { + converted.add(convertRow(r)); + } + return converted; } else { - return false; + return new ArrayList<>(); --- End diff -- since object converted has been initialized, if it has no data to add, can return it. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231755219 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -40,103 +49,150 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; /** * LOGGER */ private static final Logger LOGGER = LogServiceFactory.getLogService(RawResultIterator.class.getName()); - /** - * batch of the result. - */ - private RowBatch batch; - public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandoff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandoff) { + init(); + } } - @Override - public boolean hasNext() { - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error("Error occurs while fetching records", e); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; + + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + } + + private List<Object[]> fetchRows() throws Exception { + List<Object[]> converted = new ArrayList<>(); + if (detailRawQueryResultIterator.hasNext()) { + for (Object[] r : detailRawQueryResultIterator.next().getRows()) { + converted.add(convertRow(r)); + } + return converted; } else { - return false; + return new ArrayList<>(); --- End diff -- fine~ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1328/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1539/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2906 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9587/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2906 LGTM --- |
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |