GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/2644 [WIP][CARBONDATA-2853] Implement file-level min/max index for streaming segment Implement file-level min/max index to prune stream files in driver side. - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/carbondata stream_minmax Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2644.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2644 ---- commit c8422231a738da282042c01ce51fa2c9250d3247 Author: QiangCai <qiangcai@...> Date: 2018-08-17T11:07:13Z add file-level min/max index for streaming segement ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2644 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6295/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7944/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6667/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:
https://github.com/apache/carbondata/pull/2644 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7959/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6683/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211805649 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -324,13 +324,13 @@ public static Object getDataBasedOnDataType(String data, DataType actualDataType if (actualDataType == DataTypes.BOOLEAN) { return ByteUtil.toBytes(BooleanConvert.parseBoolean(dimensionValue)); } else if (actualDataType == DataTypes.SHORT) { - return ByteUtil.toBytes(Short.parseShort(dimensionValue)); + return ByteUtil.toXorBytes(Short.parseShort(dimensionValue)); --- End diff -- Will this affect the legacy store? It seems that a value will be encoded differently before and after this modification. If somewhere has used this method to encode and store data before, it will be a problem. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211852492 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +public class StreamDataMap { --- End diff -- add interface annotation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211852629 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; + +public class StreamFile { --- End diff -- For all public class, please add interface annotation --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211853068 --- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java --- @@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, int return flattenedData; } + /** + * perform XOR operation on the value, and convert it to byte array for sorting + */ + public static byte[] toXorBytes(short val) { + val = (short)(val ^ Short.MIN_VALUE); + return toBytes(val); + } + + public static byte[] toXorBytes(int val) { + val = val ^ Integer.MIN_VALUE; + return toBytes(val); + } + + public static byte[] toXorBytes(long val) { + val = val ^ Long.MIN_VALUE; + return toBytes(val); + } + + public static byte[] toXorBytes(double val) { + return toXorBytes(Double.doubleToLongBits(val)); + } + + /** + * convert byte array to the value, perform XOR operation on it to recover the real value + */ + public static short toXorShort(byte[] bytes, int offset, final int length) { + return (short)(toShort(bytes, offset, length) ^ Short.MIN_VALUE); + } + + public static int toXorInt(byte[] bytes, int offset, final int length) { --- End diff -- please add comment --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211855246 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java --- @@ -171,9 +179,57 @@ private void initializeAtFirstRow() throws IOException, InterruptedException { writeFileHeader(); } + initializeStatsCollector(); + isFirstRow = false; } + private void initializeStatsCollector() { + // initialize --- End diff -- please explain why the length is isNoDictionaryDimensionColumn.length --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211855812 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming.index; + +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; + +public class StreamFileIndex implements Serializable { + + private String fileName; --- End diff -- please describe the content of `fileName`, whether it includes the whole path --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211859543 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java --- @@ -334,6 +334,10 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) { return dataMap; } --- End diff -- please add a testcase to test the bigint datatype, for overflow scenario --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211854887 --- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java --- @@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, int return flattenedData; } + /** + * perform XOR operation on the value, and convert it to byte array for sorting --- End diff -- please add description for this method and the scenario --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r211868981 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +public class StreamDataMap { + + private CarbonTable carbonTable; + + private AbsoluteTableIdentifier identifier; + + private FilterExecuter filterExecuter; + + public StreamDataMap(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + this.identifier = carbonTable.getAbsoluteTableIdentifier(); + } + + public void init(FilterResolverIntf filterExp) { + if (filterExp != null) { + + List<CarbonColumn> minMaxCacheColumns = new ArrayList<>(); + for (CarbonDimension dimension : carbonTable.getDimensions()) { + if (!dimension.isComplex()) { + minMaxCacheColumns.add(dimension); + } + } + minMaxCacheColumns.addAll(carbonTable.getMeasures()); + + List<ColumnSchema> listOfColumns = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + int[] columnCardinality = new int[listOfColumns.size()]; + for (int index = 0; index < columnCardinality.length; index++) { + columnCardinality[index] = Integer.MAX_VALUE; + } + + SegmentProperties segmentProperties = + new SegmentProperties(listOfColumns, columnCardinality); + + filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, segmentProperties, null, minMaxCacheColumns); + } + } + + public List<StreamFile> prune(List<Segment> segments) throws IOException { + if (filterExecuter == null) { + return listAllStreamFiles(segments, false); + } else { + List<StreamFile> streamFileList = new ArrayList<>(); + for (StreamFile streamFile : listAllStreamFiles(segments, true)) { + if (hitStreamFile(streamFile)) { + streamFileList.add(streamFile); + streamFile.setMinMaxIndex(null); + } + } + return streamFileList; + } + } + + private boolean hitStreamFile(StreamFile streamFile) { --- End diff -- I think change the name to 'isScanRequired' is better. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2644 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6405/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8063/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2644 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/5/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r213613888 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.BlockIndex; + +@InterfaceAudience.Internal +public class StreamDataMap { + + private CarbonTable carbonTable; + + private AbsoluteTableIdentifier identifier; + + private FilterExecuter filterExecuter; + + public StreamDataMap(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + this.identifier = carbonTable.getAbsoluteTableIdentifier(); + } + + public void init(FilterResolverIntf filterExp) { + if (filterExp != null) { + + List<CarbonColumn> minMaxCacheColumns = new ArrayList<>(); + for (CarbonDimension dimension : carbonTable.getDimensions()) { + if (!dimension.isComplex()) { + minMaxCacheColumns.add(dimension); + } + } + minMaxCacheColumns.addAll(carbonTable.getMeasures()); + + List<ColumnSchema> listOfColumns = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + int[] columnCardinality = new int[listOfColumns.size()]; + for (int index = 0; index < columnCardinality.length; index++) { + columnCardinality[index] = Integer.MAX_VALUE; + } + + SegmentProperties segmentProperties = + new SegmentProperties(listOfColumns, columnCardinality); + + filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, segmentProperties, null, minMaxCacheColumns); + } + } + + public List<StreamFile> prune(List<Segment> segments) throws IOException { + if (filterExecuter == null) { + return listAllStreamFiles(segments, false); + } else { + List<StreamFile> streamFileList = new ArrayList<>(); + for (StreamFile streamFile : listAllStreamFiles(segments, true)) { + if (isScanRequire(streamFile)) { + streamFileList.add(streamFile); + streamFile.setMinMaxIndex(null); + } + } + return streamFileList; + } + } + + private boolean isScanRequire(StreamFile streamFile) { + // backward compatibility, old stream file without min/max index + if (streamFile.getMinMaxIndex() == null) { + return true; + } + + byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues(); + byte[][] minValue = streamFile.getMinMaxIndex().getMinValues(); + BitSet bitSet; + if (filterExecuter instanceof ImplicitColumnFilterExecutor) { --- End diff -- What does this for? --- |
Free forum by Nabble | Edit this page |