Github user xubo245 commented on the issue:
https://github.com/apache/carbondata/pull/2290 Please help to review it. @jackylk @ravipesala @QiangCai --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191298660 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -158,22 +163,36 @@ private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { Objects.requireNonNull(datamap); List<Segment> segments = new LinkedList<>(); + HashMap<String, Integer> uniqueSegments = new HashMap<>(); for (CarbonInputSplit split : mbSplit.getAllSplits()) { - segments.add( - Segment.toSegment(split.getSegmentId(), - new LatestFilesReadCommittedScope(table.getTablePath()))); + String segmentId = split.getSegmentId(); + if (uniqueSegments.get(segmentId) == null) { + segments.add(Segment.toSegment( + segmentId, + new LatestFilesReadCommittedScope(table.getTablePath(), segmentId))); + uniqueSegments.put(segmentId, 1); + } else { + uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); + } + } + + List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); + List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); + for (int i = 0; i < distributables.size(); i++) { + DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); + prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); } - List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null); - List<String> pathToRead = new LinkedList<>(); - for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) { - pathToRead.add(prunnedBlocklet.getPath()); + HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); + for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { + pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); } List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); List<TableBlockInfo> blockToRead = new LinkedList<>(); for (TableBlockInfo block : blocks) { - if (pathToRead.contains(block.getFilePath())) { + if (pathToRead.keySet().contains(block.getFilePath())) { + block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); --- End diff -- Why need to set this? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191298867 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -142,7 +142,7 @@ class Master(sparkConf: SparkConf) { (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user"))) } futures.foreach { case (address, future) => - ThreadUtils.awaitResult(future, Duration.apply("10s")) + ThreadUtils.awaitResult(future, Duration.apply("100s")) --- End diff -- Please add a CarbonProperty for search mode, and make the default as 10 --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191298984 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java --- @@ -77,8 +77,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) } else { throw new RuntimeException("unsupported input split type: " + inputSplit); } - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); - queryModel.setTableBlockInfos(tableBlockInfoList); + if (queryModel.getTableBlockInfos().isEmpty()) { --- End diff -- please add comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191299007 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java --- @@ -235,7 +236,8 @@ public DataMapBuilder createBuilder(Segment segment, String shardName) { } for (CarbonFile indexDir : indexDirs) { // Filter out the tasks which are filtered through CG datamap. - if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + if (getDataMapLevel() != DataMapLevel.FG && + !segment.getFilteredIndexShardNames().contains(indexDir.getName())) { --- End diff -- incorrect identation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191299224 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java --- @@ -169,6 +169,10 @@ private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletC // apply filter on actual data, for each page BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks, useBitSetPipeLine); + // if bitSetGroup is nul, then new BitSetGroup object, which can avoid NPE --- End diff -- Please describe why it is null, and when it is null --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191299753 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -96,13 +96,19 @@ private DataMapStoreManager() { String dbName = carbonTable.getDatabaseName(); String tableName = carbonTable.getTableName(); String dmName = dataMap.getDataMapSchema().getDataMapName(); - boolean isDmVisible = sessionInfo.getSessionParams().getProperty( - String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, - dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); - if (!isDmVisible) { - LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", - dmName, dbName, tableName)); - dataMapIterator.remove(); + if (sessionInfo != null) { + boolean isDmVisible = sessionInfo.getSessionParams().getProperty( + String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, + dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); + if (!isDmVisible) { + LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", + dmName, dbName, tableName)); + dataMapIterator.remove(); + } + } else { + // TODO: need support get the visible status of datamp in the future + String message = "Carbon session info is null"; + LOGGER.audit(message); --- End diff -- use info instead of audit, audit is intended for table operation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191300111 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -96,13 +96,19 @@ private DataMapStoreManager() { String dbName = carbonTable.getDatabaseName(); String tableName = carbonTable.getTableName(); String dmName = dataMap.getDataMapSchema().getDataMapName(); - boolean isDmVisible = sessionInfo.getSessionParams().getProperty( - String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, - dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); - if (!isDmVisible) { - LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", - dmName, dbName, tableName)); - dataMapIterator.remove(); + if (sessionInfo != null) { --- End diff -- please add comment to describe why this check is needed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191300474 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -42,10 +39,27 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + /** + * a new constructor of this class, which supports obtain lucene index in search mode + * + * @param path carbon file path + * @param segmentId segment id + */ + public LatestFilesReadCommittedScope(String path, String segmentId) { + this.carbonFilePath = path; + this.segmentId = segmentId; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + throw new RuntimeException("Error while taking index snapshot", ex); + } + } + + public LatestFilesReadCommittedScope(String path) { --- End diff -- This can call above method --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191300521 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -42,10 +39,27 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + /** + * a new constructor of this class, which supports obtain lucene index in search mode --- End diff -- Better not to mention search mode. Describe what this constructor will do. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191300560 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -42,10 +39,27 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + /** + * a new constructor of this class, which supports obtain lucene index in search mode + * + * @param path carbon file path + * @param segmentId segment id + */ + public LatestFilesReadCommittedScope(String path, String segmentId) { + this.carbonFilePath = path; --- End diff -- Add `Objects.requireNotNull` for path and segmentId can be null? --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191362538 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -40,9 +40,26 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; + /** + * a new constructor of this class, which supports obtain lucene index in search mode + * + * @param path carbon file path + * @param segmentId segment id + */ + public LatestFilesReadCommittedScope(String path, String segmentId) { --- End diff -- I change the constructor to public LatestFilesReadCommittedScope(String path) { this(path, null); } --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191417277 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -142,7 +142,7 @@ class Master(sparkConf: SparkConf) { (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user"))) } futures.foreach { case (address, future) => - ThreadUtils.awaitResult(future, Duration.apply("10s")) + ThreadUtils.awaitResult(future, Duration.apply("100s")) --- End diff -- ok, down, default value is 10s --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191432727 --- Diff: store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java --- @@ -158,22 +163,36 @@ private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { Objects.requireNonNull(datamap); List<Segment> segments = new LinkedList<>(); + HashMap<String, Integer> uniqueSegments = new HashMap<>(); for (CarbonInputSplit split : mbSplit.getAllSplits()) { - segments.add( - Segment.toSegment(split.getSegmentId(), - new LatestFilesReadCommittedScope(table.getTablePath()))); + String segmentId = split.getSegmentId(); + if (uniqueSegments.get(segmentId) == null) { + segments.add(Segment.toSegment( + segmentId, + new LatestFilesReadCommittedScope(table.getTablePath(), segmentId))); + uniqueSegments.put(segmentId, 1); + } else { + uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); + } + } + + List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); + List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); + for (int i = 0; i < distributables.size(); i++) { + DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); + prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); } - List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null); - List<String> pathToRead = new LinkedList<>(); - for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) { - pathToRead.add(prunnedBlocklet.getPath()); + HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); + for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { + pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); } List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); List<TableBlockInfo> blockToRead = new LinkedList<>(); for (TableBlockInfo block : blocks) { - if (pathToRead.contains(block.getFilePath())) { + if (pathToRead.keySet().contains(block.getFilePath())) { + block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); --- End diff -- if not set this, the bitSetGroup will be null (reason refer org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode#getIndexedData). And it will throw exception in org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl#applyFilter(org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks, boolean) --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191442174 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java --- @@ -169,6 +169,10 @@ private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletC // apply filter on actual data, for each page BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks, useBitSetPipeLine); + // if bitSetGroup is nul, then new BitSetGroup object, which can avoid NPE --- End diff -- before PR2242, it will be null if the bitSetGroup of rawBlockletColumnChunks is null, so we can remove this now --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191447282 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java --- @@ -77,8 +77,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) } else { throw new RuntimeException("unsupported input split type: " + inputSplit); } - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); - queryModel.setTableBlockInfos(tableBlockInfoList); + if (queryModel.getTableBlockInfos().isEmpty()) { --- End diff -- ok, done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191447507 --- Diff: datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java --- @@ -235,7 +236,8 @@ public DataMapBuilder createBuilder(Segment segment, String shardName) { } for (CarbonFile indexDir : indexDirs) { // Filter out the tasks which are filtered through CG datamap. - if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + if (getDataMapLevel() != DataMapLevel.FG && + !segment.getFilteredIndexShardNames().contains(indexDir.getName())) { --- End diff -- ok,done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191447617 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -96,13 +96,19 @@ private DataMapStoreManager() { String dbName = carbonTable.getDatabaseName(); String tableName = carbonTable.getTableName(); String dmName = dataMap.getDataMapSchema().getDataMapName(); - boolean isDmVisible = sessionInfo.getSessionParams().getProperty( - String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, - dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); - if (!isDmVisible) { - LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", - dmName, dbName, tableName)); - dataMapIterator.remove(); + if (sessionInfo != null) { + boolean isDmVisible = sessionInfo.getSessionParams().getProperty( + String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, + dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); + if (!isDmVisible) { + LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", + dmName, dbName, tableName)); + dataMapIterator.remove(); + } + } else { + // TODO: need support get the visible status of datamp in the future + String message = "Carbon session info is null"; + LOGGER.audit(message); --- End diff -- ok,done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191448581 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -96,13 +96,19 @@ private DataMapStoreManager() { String dbName = carbonTable.getDatabaseName(); String tableName = carbonTable.getTableName(); String dmName = dataMap.getDataMapSchema().getDataMapName(); - boolean isDmVisible = sessionInfo.getSessionParams().getProperty( - String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, - dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); - if (!isDmVisible) { - LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", - dmName, dbName, tableName)); - dataMapIterator.remove(); + if (sessionInfo != null) { --- End diff -- ok,done --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2290#discussion_r191450160 --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java --- @@ -42,10 +39,27 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + /** + * a new constructor of this class, which supports obtain lucene index in search mode + * + * @param path carbon file path + * @param segmentId segment id + */ + public LatestFilesReadCommittedScope(String path, String segmentId) { + this.carbonFilePath = path; + this.segmentId = segmentId; + try { + takeCarbonIndexFileSnapShot(); + } catch (IOException ex) { + throw new RuntimeException("Error while taking index snapshot", ex); + } + } + + public LatestFilesReadCommittedScope(String path) { --- End diff -- ok, done --- |
Free forum by Nabble | Edit this page |