Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214877591 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -360,6 +360,10 @@ private String getKeyUsingTablePath(String tablePath) { return null; } + public StreamDataMap getStreamDataMap(CarbonTable table) { --- End diff -- Please remove from DataMap manager if you are not inline with datamap intefaces --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214881265 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +@InterfaceAudience.Internal +public class StreamDataMap { + + private CarbonTable carbonTable; + private FilterExecuter filterExecuter; + + public StreamDataMap(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } + + public void init(FilterResolverIntf filterExp) { + if (filterExp != null) { + // cache all columns + List<CarbonColumn> minMaxCacheColumns = new ArrayList<>(); + for (CarbonDimension dimension : carbonTable.getDimensions()) { + if (!dimension.isComplex()) { + minMaxCacheColumns.add(dimension); + } + } + minMaxCacheColumns.addAll(carbonTable.getMeasures()); + // prepare cardinality of all dimensions + List<ColumnSchema> listOfColumns = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + int[] columnCardinality = new int[listOfColumns.size()]; + for (int index = 0; index < columnCardinality.length; index++) { + columnCardinality[index] = Integer.MAX_VALUE; + } + // initial filter executor + SegmentProperties segmentProperties = + new SegmentProperties(listOfColumns, columnCardinality); + filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, segmentProperties, null, minMaxCacheColumns); + } + } + + public List<StreamFile> prune(List<Segment> segments) throws IOException { + if (filterExecuter == null) { + // if filter is null, list all steam files + return listAllStreamFiles(segments, false); + } else { + List<StreamFile> streamFileList = new ArrayList<>(); + for (StreamFile streamFile : listAllStreamFiles(segments, true)) { + if (isScanRequire(streamFile)) { + // if stream file is required to scan + streamFileList.add(streamFile); + streamFile.setMinMaxIndex(null); + } + } + return streamFileList; + } + } + + private boolean isScanRequire(StreamFile streamFile) { + // backward compatibility, old stream file without min/max index + if (streamFile.getMinMaxIndex() == null) { + return true; + } + byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues(); + byte[][] minValue = streamFile.getMinMaxIndex().getMinValues(); + BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue); + if (!bitSet.isEmpty()) { + return true; + } else { + return false; + } + } + + // TODO optimize and move the code to StreamSegment , but it's in the streaming module. + private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax) + throws IOException { + List<StreamFile> streamFileList = new ArrayList<>(); + for (Segment segment : segments) { + String segmentDir = CarbonTablePath.getSegmentPath( + carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo()); + FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); + if (FileFactory.isFileExist(segmentDir, fileType)) { + SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore(); + segmentIndexFileStore.readAllIIndexOfSegment(segmentDir); --- End diff -- It involves the listing of a folder and gets the index file. Its a costly operation for s3 file system as you are not even caching it. So it is better fix the carbonindex file name in case of streaming and read it directly instead of listing. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214882605 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; + +@InterfaceAudience.Internal +public class StreamFile { + + private String segmentNo; + + private String filePath; + + private long fileSize; + + private BlockletMinMaxIndex minMaxIndex; + + public StreamFile(String segmentNo, String filePath, long fileSize) { + this.segmentNo = segmentNo; + this.filePath = filePath; + this.fileSize = fileSize; + } + + public String getSegmentNo() { + return segmentNo; + } + + public void setSegmentNo(String segmentNo) { --- End diff -- Please remove unused setters --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214883334 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl return filteredSegmentToAccess; } + public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, + CarbonTable carbonTable) throws IOException { + return getSplitsOfStreaming(job, streamSegments, carbonTable, null); + } + /** * use file list in .carbonindex file to get the split of streaming. */ - public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier, - List<Segment> streamSegments) throws IOException { + public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, + CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); if (streamSegments != null && !streamSegments.isEmpty()) { numStreamSegments = streamSegments.size(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); - for (Segment segment : streamSegments) { - String segmentDir = - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); - FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); - if (FileFactory.isFileExist(segmentDir, fileType)) { - SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore(); - segmentIndexFileStore.readAllIIndexOfSegment(segmentDir); - Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap(); - CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); - for (byte[] fileData : carbonIndexMap.values()) { - indexReader.openThriftReader(fileData); - try { - // map block index - while (indexReader.hasNext()) { - BlockIndex blockIndex = indexReader.readBlockIndexInfo(); - String filePath = segmentDir + File.separator + blockIndex.getFile_name(); - Path path = new Path(filePath); - long length = blockIndex.getFile_size(); - if (length != 0) { - BlockLocation[] blkLocations; - FileSystem fs = FileFactory.getFileSystem(path); - FileStatus file = fs.getFileStatus(path); - blkLocations = fs.getFileBlockLocations(path, 0, length); - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(blockSize, minSize, maxSize); - long bytesRemaining = length; - while (((double) bytesRemaining) / splitSize > 1.1) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining, - splitSize, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); - bytesRemaining -= splitSize; - } - if (bytesRemaining != 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining, - bytesRemaining, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); - } - } else { - //Create empty hosts array for zero length files - splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0], - FileFormat.ROW_V1)); - } - } - } finally { - indexReader.closeThriftReader(); + if (filterResolverIntf == null) { + if (carbonTable != null) { + Expression filter = getFilterPredicates(job.getConfiguration()); + if (filter != null) { + carbonTable.processFilterExpression(filter, null, null); + filterResolverIntf = carbonTable.resolveFilter(filter); + } + } + } + StreamDataMap streamDataMap = + DataMapStoreManager.getInstance().getStreamDataMap(carbonTable); + streamDataMap.init(filterResolverIntf); + List<StreamFile> streamFiles = streamDataMap.prune(streamSegments); + + for (StreamFile streamFile : streamFiles) { + if (FileFactory.isFileExist(streamFile.getFilePath())) { --- End diff -- Why do you need to do file exists check here? Please remove if not needed, will be costly for s3 --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214886142 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java --- @@ -212,9 +213,13 @@ private void initializeAtFirstRow() throws IOException, InterruptedException { byte[] col = (byte[]) columnValue; output.writeShort(col.length); output.writeBytes(col); + output.dimStatsCollectors[dimCount].update(col); } else { output.writeInt((int) columnValue); + output.dimStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue)); --- End diff -- update(int ) isn't be implemented in KeyPageStatsCollector --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214887830 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +@InterfaceAudience.Internal +public class StreamDataMap { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214887850 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -360,6 +360,10 @@ private String getKeyUsingTablePath(String tablePath) { return null; } + public StreamDataMap getStreamDataMap(CarbonTable table) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214889120 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +@InterfaceAudience.Internal +public class StreamDataMap { + + private CarbonTable carbonTable; + private FilterExecuter filterExecuter; + + public StreamDataMap(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } + + public void init(FilterResolverIntf filterExp) { + if (filterExp != null) { + // cache all columns + List<CarbonColumn> minMaxCacheColumns = new ArrayList<>(); + for (CarbonDimension dimension : carbonTable.getDimensions()) { + if (!dimension.isComplex()) { + minMaxCacheColumns.add(dimension); + } + } + minMaxCacheColumns.addAll(carbonTable.getMeasures()); + // prepare cardinality of all dimensions + List<ColumnSchema> listOfColumns = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + int[] columnCardinality = new int[listOfColumns.size()]; + for (int index = 0; index < columnCardinality.length; index++) { + columnCardinality[index] = Integer.MAX_VALUE; + } + // initial filter executor + SegmentProperties segmentProperties = + new SegmentProperties(listOfColumns, columnCardinality); + filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, segmentProperties, null, minMaxCacheColumns); + } + } + + public List<StreamFile> prune(List<Segment> segments) throws IOException { + if (filterExecuter == null) { + // if filter is null, list all steam files + return listAllStreamFiles(segments, false); + } else { + List<StreamFile> streamFileList = new ArrayList<>(); + for (StreamFile streamFile : listAllStreamFiles(segments, true)) { + if (isScanRequire(streamFile)) { + // if stream file is required to scan + streamFileList.add(streamFile); + streamFile.setMinMaxIndex(null); + } + } + return streamFileList; + } + } + + private boolean isScanRequire(StreamFile streamFile) { + // backward compatibility, old stream file without min/max index + if (streamFile.getMinMaxIndex() == null) { + return true; + } + byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues(); + byte[][] minValue = streamFile.getMinMaxIndex().getMinValues(); + BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue); + if (!bitSet.isEmpty()) { + return true; + } else { + return false; + } + } + + // TODO optimize and move the code to StreamSegment , but it's in the streaming module. + private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax) + throws IOException { + List<StreamFile> streamFileList = new ArrayList<>(); + for (Segment segment : segments) { + String segmentDir = CarbonTablePath.getSegmentPath( + carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo()); + FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); + if (FileFactory.isFileExist(segmentDir, fileType)) { + SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore(); + segmentIndexFileStore.readAllIIndexOfSegment(segmentDir); --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214889635 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; + +@InterfaceAudience.Internal +public class StreamFile { + + private String segmentNo; + + private String filePath; + + private long fileSize; + + private BlockletMinMaxIndex minMaxIndex; + + public StreamFile(String segmentNo, String filePath, long fileSize) { + this.segmentNo = segmentNo; + this.filePath = filePath; + this.fileSize = fileSize; + } + + public String getSegmentNo() { + return segmentNo; + } + + public void setSegmentNo(String segmentNo) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214889642 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl return filteredSegmentToAccess; } + public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, + CarbonTable carbonTable) throws IOException { + return getSplitsOfStreaming(job, streamSegments, carbonTable, null); + } + /** * use file list in .carbonindex file to get the split of streaming. */ - public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier, - List<Segment> streamSegments) throws IOException { + public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, + CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); if (streamSegments != null && !streamSegments.isEmpty()) { numStreamSegments = streamSegments.size(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); - for (Segment segment : streamSegments) { - String segmentDir = - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); - FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); - if (FileFactory.isFileExist(segmentDir, fileType)) { - SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore(); - segmentIndexFileStore.readAllIIndexOfSegment(segmentDir); - Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap(); - CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); - for (byte[] fileData : carbonIndexMap.values()) { - indexReader.openThriftReader(fileData); - try { - // map block index - while (indexReader.hasNext()) { - BlockIndex blockIndex = indexReader.readBlockIndexInfo(); - String filePath = segmentDir + File.separator + blockIndex.getFile_name(); - Path path = new Path(filePath); - long length = blockIndex.getFile_size(); - if (length != 0) { - BlockLocation[] blkLocations; - FileSystem fs = FileFactory.getFileSystem(path); - FileStatus file = fs.getFileStatus(path); - blkLocations = fs.getFileBlockLocations(path, 0, length); - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(blockSize, minSize, maxSize); - long bytesRemaining = length; - while (((double) bytesRemaining) / splitSize > 1.1) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining, - splitSize, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); - bytesRemaining -= splitSize; - } - if (bytesRemaining != 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining, - bytesRemaining, blkLocations[blkIndex].getHosts(), - blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); - } - } else { - //Create empty hosts array for zero length files - splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0], - FileFormat.ROW_V1)); - } - } - } finally { - indexReader.closeThriftReader(); + if (filterResolverIntf == null) { + if (carbonTable != null) { + Expression filter = getFilterPredicates(job.getConfiguration()); + if (filter != null) { + carbonTable.processFilterExpression(filter, null, null); + filterResolverIntf = carbonTable.resolveFilter(filter); + } + } + } + StreamDataMap streamDataMap = + DataMapStoreManager.getInstance().getStreamDataMap(carbonTable); + streamDataMap.init(filterResolverIntf); + List<StreamFile> streamFiles = streamDataMap.prune(streamSegments); + + for (StreamFile streamFile : streamFiles) { + if (FileFactory.isFileExist(streamFile.getFilePath())) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214893284 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl /** * use file list in .carbonindex file to get the split of streaming. */ - public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier, - List<Segment> streamSegments) throws IOException { + public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, + CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8302/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/232/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |