GitHub user manishgupta88 opened a pull request:
https://github.com/apache/carbondata/pull/2191 [WIP] Improve compaction performance Problem: Compaction performance is slow as compared to data load Analysis: 1. During compaction result filling is done in row format. Due to this as the number of columns increases the dimension and measure data filling time increases. This happens because in row filling we are not able to take advantage of OS cacheable buffers as we continuously read data for next column. 2. As compaction uses a page level reader flow wherein both IO and uncompression is done at page level, the IO and uncompression time increases in this model. Solution: 1. Implement a columnar format filling data structure for compaction process for filling dimension and measure data. 2. Perform IO at blocklet level and uncompression at page level. **Note:** **This PR contains the solution for point 1** - [ ] Any interfaces changed? No - [ ] Any backward compatibility impacted? No - [ ] Document update required? No - [ ] Testing done Manually verified - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/carbondata compaction_columnar_result_filling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2191 ---- commit a27928e81621cfa1acb72f6545f222d55c02c3f4 Author: m00258959 <manish.gupta@...> Date: 2018-04-13T14:45:40Z Problem: Compaction performance is slow as compared to data load Analysis: 1. During compaction result filling is done in row format. Due to this as the number of columns increases the dimension and measure data filling time increases. This happens because in row filling we are not able to take advantage of OS cacheable buffers as we continuously read data for next column. 2. As compaction uses a page level reader flow wherein both IO and uncompression is done at page level, the IO and uncompression time increases in this model. Solution: 1. Implement a columnar format filling data structure for compaction process for filling dimension and measure data. 2. Perform IO at blocklet level and uncompression at page level. ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4021/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5213/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183304163 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1542,7 +1542,8 @@ public static final String CARBON_ENABLE_PAGE_LEVEL_READER_IN_COMPACTION = --- End diff -- Please add a @InterfaceStability.Envolving --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183304952 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java --- @@ -57,10 +59,14 @@ */ protected DimensionInfo dimensionInfo; - public AbstractScannedResultCollector(BlockExecutionInfo blockExecutionInfos) { + protected QueryStatisticsModel queryStatisticsModel; --- End diff -- Why this is needed, can you provide description in comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183305545 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java --- @@ -150,105 +155,110 @@ private void initCurrentBlockKeyGenerator() { * it will keep track of how many record is processed, to handle limit scenario */ @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + long startTime = System.currentTimeMillis(); List<Object[]> listBasedResult = new ArrayList<>(batchSize); QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); // scan the record and add to list - int rowCounter = 0; - while (scannedResult.hasNext() && rowCounter < batchSize) { - scanResultAndGetData(scannedResult); - if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { - continue; - } - // re-fill dictionary and no dictionary key arrays for the newly added columns - if (dimensionInfo.isDictionaryColumnAdded()) { - dictionaryKeyArray = fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray); - } - if (dimensionInfo.isNoDictionaryColumnAdded()) { - noDictionaryKeyArray = fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray); - } - prepareRow(scannedResult, listBasedResult, queryMeasures); - rowCounter++; + scanAndFillData(scannedResult, batchSize, listBasedResult, queryMeasures); + // re-fill dictionary and no dictionary key arrays for the newly added columns + if (dimensionInfo.isDictionaryColumnAdded()) { + fillDictionaryKeyArrayBatchWithLatestSchema(listBasedResult); + } + if (dimensionInfo.isNoDictionaryColumnAdded()) { + fillNoDictionaryKeyArrayBatchWithLatestSchema(listBasedResult); } + QueryStatistic resultPrepTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.RESULT_PREP_TIME); + resultPrepTime.addCountStatistic(QueryStatisticsConstants.RESULT_PREP_TIME, + resultPrepTime.getCount() + (System.currentTimeMillis() - startTime)); return listBasedResult; } /** * This method will fill the dictionary key array with newly added dictionary columns if any * - * @param dictionaryKeyArray + * @param rows * @return */ - private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) { - QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); - int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount(); - long[] keyArray = null; - if (null != updatedCurrentBlockKeyGenerator) { - keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray); - newKeyArrayLength += keyArray.length; - } - long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength]; - int existingColumnKeyArrayIndex = 0; - int newKeyArrayIndex = 0; - for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) { - if (CarbonUtil - .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) { - // if dimension exists then add the key array value else add the default value - if (dimensionInfo.getDimensionExists()[i]) { - keyArrayWithNewAddedColumns[newKeyArrayIndex++] = keyArray[existingColumnKeyArrayIndex++]; - } else { - long defaultValueAsLong; - Object defaultValue = dimensionInfo.getDefaultValues()[i]; - if (null != defaultValue) { - defaultValueAsLong = ((Integer) defaultValue).longValue(); + private void fillDictionaryKeyArrayBatchWithLatestSchema(List<Object[]> rows) { + for (Object[] row : rows) { + ByteArrayWrapper byteArrayWrapper = (ByteArrayWrapper) row[0]; + byte[] dictKeyArray = byteArrayWrapper.getDictionaryKey(); + QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + int newKeyArrayLength = dimensionInfo.getNewDictionaryColumnCount(); + long[] keyArray = null; + if (null != updatedCurrentBlockKeyGenerator) { + keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictKeyArray); + newKeyArrayLength += keyArray.length; + } + long[] keyArrayWithNewAddedColumns = new long[newKeyArrayLength]; + int existingColumnKeyArrayIndex = 0; + int newKeyArrayIndex = 0; + for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) { + if (CarbonUtil.hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), + Encoding.DICTIONARY)) { + // if dimension exists then add the key array value else add the default value + if (dimensionInfo.getDimensionExists()[i]) { + keyArrayWithNewAddedColumns[newKeyArrayIndex++] = + keyArray[existingColumnKeyArrayIndex++]; } else { - defaultValueAsLong = (long)CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + long defaultValueAsLong; + Object defaultValue = dimensionInfo.getDefaultValues()[i]; + if (null != defaultValue) { + defaultValueAsLong = ((Integer) defaultValue).longValue(); + } else { + defaultValueAsLong = (long) CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + } + keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong; } - keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong; } } + try { + dictKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns); + } catch (KeyGenException e) { + LOGGER.error(e, e.getMessage()); --- End diff -- No need to throw it? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183306059 --- Diff: core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsConstants.java --- @@ -58,6 +58,14 @@ String PAGE_SCANNED = "The number of page scanned"; + String MEASURE_FILLING_TIME = "measure filling time"; --- End diff -- Can you provide comment, "filling" means after IO read and decoding and copying into the target buffer? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183342483 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java --- @@ -48,6 +53,31 @@ public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) { return getDictionaryKeyIntegerArray(currentRow); } + @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) { + ++currentRow; --- End diff -- Is this increment is intentional? Because increment as per batch size is already happening in `RawBasedResultCollector` --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2191#discussion_r183342972 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java --- @@ -48,6 +53,31 @@ public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) { return getDictionaryKeyIntegerArray(currentRow); } + @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) { + ++currentRow; --- End diff -- yes this is intentional. The same increment is happening in getDictionaryKeyIntegerArray() also where currrentRow counter is incremented before fetching the results for that batch because it starts from -1 --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2191 @manishgupta88 I feel here also you can use `CarbonColumnVectorImpl` and fill them just like how vector reader is doing, should not have separate flow again for it. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2191 @ravipesala During compaction we retrieve dictionary values in the form of MDKey and no dictionary columns as byte array. I can make use of CarbonColumnVectorImpl but in that case I will have to write a new iterator which will fill the data column wise and then combine all dictionary MDKeys into one array and no dictionary data into one array which is input to CarbonRow. This will again have cost of combining the values. Is that ok? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2191 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4478/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5336/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4167/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2191 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4487/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2191 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4488/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4179/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2191 @manishgupta88 What I meant is we already have one flow to fill data in columinar format through vectors. Yo can make use of same flow instead of deriving another flow. It will take time to stabilize again if you derive another flow and need fix issues in 2 places in future. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2191 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/5356/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2191 Already pr raised in master PR #2210 --- |
Free forum by Nabble | Edit this page |