GitHub user kevinjmh opened a pull request:
https://github.com/apache/carbondata/pull/2713 [CARBONDATA-2931][BloomDataMap] Optimize bloom datamap pruning 1. re-use shard pruning info from default datamap 2. create one BloomCoarseGrainDataMap object per segment instead of per shard. (This is also preparation for parallel segment pruning) Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kevinjmh/carbondata bloom_shard_op Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2713.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2713 ---- commit 8544ed699424ff6089d28b8e07f582a2f9e25b78 Author: Manhua <kevinjmh@...> Date: 2018-08-30T01:43:10Z all shard of one segment use one datamap ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/257/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/426/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8496/ --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on the issue:
https://github.com/apache/carbondata/pull/2713 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/262/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/431/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8501/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/8778/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/711/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/533/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/734/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/8999/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2713 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/931/ --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242387613 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java --- @@ -218,56 +218,46 @@ public DataMapBuilder createBuilder(Segment segment, String shardName, this.bloomFilterSize, this.bloomFilterFpp, bloomCompress); } - /** - * returns all shard directories of bloom index files for query - * if bloom index files are merged we should get only one shard path - */ - private Set<String> getAllShardPaths(String tablePath, String segmentId) { - String dataMapStorePath = CarbonTablePath.getDataMapStorePath( - tablePath, segmentId, dataMapName); - CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); - Set<String> shardPaths = new HashSet<>(); + + private boolean isAllShardsMerged(String dmSegmentPath) { + boolean mergeShardExist = false; boolean mergeShardInprogress = false; - CarbonFile mergeShardFile = null; + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dmSegmentPath).listFiles(); for (CarbonFile carbonFile : carbonFiles) { - if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { - mergeShardFile = carbonFile; - } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { + String fileName = carbonFile.getName(); + if (fileName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { + mergeShardExist = true; + } else if (fileName.equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { mergeShardInprogress = true; - } else if (carbonFile.isDirectory()) { - shardPaths.add(FileFactory.getPath(carbonFile.getAbsolutePath()).toString()); } } - if (mergeShardFile != null && !mergeShardInprogress) { - // should only get one shard path if mergeShard is generated successfully - shardPaths.clear(); - shardPaths.add(FileFactory.getPath(mergeShardFile.getAbsolutePath()).toString()); - } - return shardPaths; + return mergeShardExist && !mergeShardInprogress; } @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); try { - Set<String> shardPaths = segmentMap.get(segment.getSegmentNo()); - if (shardPaths == null) { - shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo()); - segmentMap.put(segment.getSegmentNo(), shardPaths); - } - Set<String> filteredShards = segment.getFilteredIndexShardNames(); - for (String shard : shardPaths) { - if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) || - filteredShards.contains(new File(shard).getName())) { - // Filter out the tasks which are filtered through Main datamap. - // for merge shard, shard pruning delay to be done before pruning blocklet - BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); - bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration())); - bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); - bloomDM.setFilteredShard(filteredShards); - dataMaps.add(bloomDM); - } + String dmSegmentPath = CarbonTablePath.getDataMapStorePath( + getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName); + boolean useMergeShard = isAllShardsMerged(dmSegmentPath); + + // make use of filtered shard info from default datamap to build bloom datamap + BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); + bloomDM.init(new BloomDataMapModel(dmSegmentPath, cache, FileFactory.getConfiguration())); + bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); + bloomDM.setFilteredShard(segment.getFilteredIndexShardNames(), useMergeShard); + dataMaps.add(bloomDM); + + // save shard info for clearing cache + Set<String> shardPaths = new HashSet<>(); + if (useMergeShard) { + shardPaths.add(dmSegmentPath + File.separator + + BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME); + } else { + shardPaths.addAll(segment.getFilteredIndexShardNames()); } + segmentMap.put(segment.getSegmentNo(), shardPaths); --- End diff -- segmentMap is used cache the shardPaths, now it's uselessï¼I don't think it's necessary to get shardPaths it's ok to change segmentMap to a Set that add segment no --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242393730 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java --- @@ -178,15 +178,9 @@ private String getAncestorTablePath(CarbonTable currentTable) { for (BloomQueryModel bloomQueryModel : bloomQueryModels) { Set<Blocklet> tempHitBlockletsResult = new HashSet<>(); LOGGER.debug("prune blocklet for query: " + bloomQueryModel); - BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey( - this.indexPath.toString(), bloomQueryModel.columnName); - BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey); - List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters(); - for (CarbonBloomFilter bloomFilter : bloomIndexList) { - if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) { --- End diff -- Why delete the code of filtererShard.contains(bloomFilter.getShardName()), I think this code can reduce time --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242394373 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java --- @@ -178,15 +178,9 @@ private String getAncestorTablePath(CarbonTable currentTable) { for (BloomQueryModel bloomQueryModel : bloomQueryModels) { Set<Blocklet> tempHitBlockletsResult = new HashSet<>(); LOGGER.debug("prune blocklet for query: " + bloomQueryModel); - BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey( - this.indexPath.toString(), bloomQueryModel.columnName); - BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey); - List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters(); - for (CarbonBloomFilter bloomFilter : bloomIndexList) { - if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) { --- End diff -- usage of this info is moved to `getBloomFilters` --- |
In reply to this post by qiuchenjian-2
Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242430992 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java --- @@ -218,56 +218,46 @@ public DataMapBuilder createBuilder(Segment segment, String shardName, this.bloomFilterSize, this.bloomFilterFpp, bloomCompress); } - /** - * returns all shard directories of bloom index files for query - * if bloom index files are merged we should get only one shard path - */ - private Set<String> getAllShardPaths(String tablePath, String segmentId) { - String dataMapStorePath = CarbonTablePath.getDataMapStorePath( - tablePath, segmentId, dataMapName); - CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); - Set<String> shardPaths = new HashSet<>(); + + private boolean isAllShardsMerged(String dmSegmentPath) { + boolean mergeShardExist = false; boolean mergeShardInprogress = false; - CarbonFile mergeShardFile = null; + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dmSegmentPath).listFiles(); for (CarbonFile carbonFile : carbonFiles) { - if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { - mergeShardFile = carbonFile; - } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { + String fileName = carbonFile.getName(); + if (fileName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { + mergeShardExist = true; + } else if (fileName.equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { mergeShardInprogress = true; --- End diff -- If MERGE_INPROGRESS_FILE exists, shard's index file will be deleted sometime, so this scene need to be focus on, but this question shows up before this PR --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242454233 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java --- @@ -218,56 +218,46 @@ public DataMapBuilder createBuilder(Segment segment, String shardName, this.bloomFilterSize, this.bloomFilterFpp, bloomCompress); } - /** - * returns all shard directories of bloom index files for query - * if bloom index files are merged we should get only one shard path - */ - private Set<String> getAllShardPaths(String tablePath, String segmentId) { - String dataMapStorePath = CarbonTablePath.getDataMapStorePath( - tablePath, segmentId, dataMapName); - CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); - Set<String> shardPaths = new HashSet<>(); + + private boolean isAllShardsMerged(String dmSegmentPath) { + boolean mergeShardExist = false; boolean mergeShardInprogress = false; - CarbonFile mergeShardFile = null; + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dmSegmentPath).listFiles(); for (CarbonFile carbonFile : carbonFiles) { - if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { - mergeShardFile = carbonFile; - } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { + String fileName = carbonFile.getName(); + if (fileName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { + mergeShardExist = true; + } else if (fileName.equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { mergeShardInprogress = true; --- End diff -- Yes, you are right. We need to fix this. If we allow to use bloom filter when the index files are merging, maybe any IO Exception will occur in following steps when the merging is done. Some simple ideas for this: 1. datamap do not choose bloom when merging is under action 2. change the pruning logic to segment independent, any datamap excepts default datamap can reject or fail the segment pruning ( by return null or ?), and no more result blocklet intersection for this datamap, such that this does not affect final result --- |
In reply to this post by qiuchenjian-2
Github user kevinjmh commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2713#discussion_r242507682 --- Diff: datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java --- @@ -218,56 +218,46 @@ public DataMapBuilder createBuilder(Segment segment, String shardName, this.bloomFilterSize, this.bloomFilterFpp, bloomCompress); } - /** - * returns all shard directories of bloom index files for query - * if bloom index files are merged we should get only one shard path - */ - private Set<String> getAllShardPaths(String tablePath, String segmentId) { - String dataMapStorePath = CarbonTablePath.getDataMapStorePath( - tablePath, segmentId, dataMapName); - CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); - Set<String> shardPaths = new HashSet<>(); + + private boolean isAllShardsMerged(String dmSegmentPath) { + boolean mergeShardExist = false; boolean mergeShardInprogress = false; - CarbonFile mergeShardFile = null; + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dmSegmentPath).listFiles(); for (CarbonFile carbonFile : carbonFiles) { - if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { - mergeShardFile = carbonFile; - } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { + String fileName = carbonFile.getName(); + if (fileName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) { + mergeShardExist = true; + } else if (fileName.equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) { mergeShardInprogress = true; --- End diff -- One more idea is that we can delay the deletion of original shards in query, referring to segment management. That is when mergeShard exists and no merge inprogress file in a query, we can assure to delete original shards safely. --- |
Free forum by Nabble | Edit this page |