Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226832821 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java --- @@ -173,6 +221,23 @@ public int getSizeInBytes() { return new BigDecimal(bigInteger, scale); } + @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, + BitSet nullBitset) { + CarbonColumnVector vector = info.vector; + int precision = info.measure.getMeasure().getPrecision(); + if (valuesToBeConverted instanceof byte[][]) { + byte[][] data = (byte[][]) valuesToBeConverted; + for (int i = 0; i < size; i++) { + if (nullBitset.get(i)) { + vector.putNull(i); + } else { + BigInteger bigInteger = new BigInteger(data[i]); + vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision); --- End diff -- The method `DataTypeUtil.byteToBigDecimal(data[i]), precision)` is different as it calculates scale also from binary, but here we no need of it. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863559 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DirectPageWiseVectorFillResultCollector.java --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor; + +/** + * It delegates the vector to fill the data directly from decoded pages. + */ +public class DirectPageWiseVectorFillResultCollector extends AbstractScannedResultCollector { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863569 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DirectPageWiseVectorFillResultCollector.java --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor; + +/** + * It delegates the vector to fill the data directly from decoded pages. + */ +public class DirectPageWiseVectorFillResultCollector extends AbstractScannedResultCollector { + + protected ProjectionDimension[] queryDimensions; + + protected ProjectionMeasure[] queryMeasures; + + private ColumnVectorInfo[] dictionaryInfo; + + private ColumnVectorInfo[] noDictionaryInfo; + + private ColumnVectorInfo[] complexInfo; + + private ColumnVectorInfo[] measureColumnInfo; + + ColumnVectorInfo[] allColumnInfo; + + public DirectPageWiseVectorFillResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + // initialize only if the current block is not a restructured block else the initialization + // will be taken care by RestructureBasedVectorResultCollector + if (!blockExecutionInfos.isRestructuredBlock()) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863575 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java --- @@ -272,5 +293,164 @@ public double decodeDouble(double value) { // this codec is for integer type only throw new RuntimeException("internal error"); } + + @Override + public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) { + CarbonColumnVector vector = vectorInfo.vector; + BitSet nullBits = columnPage.getNullBits(); + DataType dataType = vector.getType(); + DataType type = columnPage.getDataType(); + int pageSize = columnPage.getPageSize(); + BitSet deletedRows = vectorInfo.deletedRows; + fillVector(columnPage, vector, dataType, type, pageSize, vectorInfo); + if (deletedRows == null || deletedRows.isEmpty()) { + for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) { + vector.putNull(i); + } + } + } + + private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType dataType, + DataType type, int pageSize, ColumnVectorInfo vectorInfo) { + if (type == DataTypes.BOOLEAN || type == DataTypes.BYTE) { + byte[] byteData = columnPage.getByteData(); + if (dataType == DataTypes.SHORT) { + for (int i = 0; i < pageSize; i++) { + vector.putShort(i, (short) (max - byteData[i])); + } + } else if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) (max - byteData[i])); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - byteData[i])); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - byteData[i]) * 1000); + } + } else if (dataType == DataTypes.BOOLEAN) { + for (int i = 0; i < pageSize; i++) { + vector.putByte(i, (byte) (max - byteData[i])); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + int precision = vectorInfo.measure.getMeasure().getPrecision(); + for (int i = 0; i < pageSize; i++) { + BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]); + vector.putDecimal(i, decimal, precision); + } + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, (max - byteData[i])); + } + } + } else if (type == DataTypes.SHORT) { + short[] shortData = columnPage.getShortData(); + if (dataType == DataTypes.SHORT) { + for (int i = 0; i < pageSize; i++) { + vector.putShort(i, (short) (max - shortData[i])); + } + } else if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) (max - shortData[i])); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - shortData[i])); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - shortData[i]) * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + int precision = vectorInfo.measure.getMeasure().getPrecision(); + for (int i = 0; i < pageSize; i++) { + BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]); + vector.putDecimal(i, decimal, precision); + } + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, (max - shortData[i])); + } + } + + } else if (type == DataTypes.SHORT_INT) { + int[] shortIntData = columnPage.getShortIntData(); + if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) (max - shortIntData[i])); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - shortIntData[i])); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - shortIntData[i]) * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + int precision = vectorInfo.measure.getMeasure().getPrecision(); + for (int i = 0; i < pageSize; i++) { + BigDecimal decimal = decimalConverter.getDecimal(max - shortIntData[i]); + vector.putDecimal(i, decimal, precision); + } + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, (max - shortIntData[i])); + } + } + } else if (type == DataTypes.INT) { + int[] intData = columnPage.getIntData(); + if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) (max - intData[i])); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - intData[i])); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - intData[i]) * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + int precision = vectorInfo.measure.getMeasure().getPrecision(); + for (int i = 0; i < pageSize; i++) { + BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]); + vector.putDecimal(i, decimal, precision); + } + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, (max - intData[i])); + } + } + } else if (type == DataTypes.LONG) { + long[] longData = columnPage.getLongData(); + if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - longData[i])); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, (max - longData[i]) * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + int precision = vectorInfo.measure.getMeasure().getPrecision(); + for (int i = 0; i < pageSize; i++) { + BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]); + vector.putDecimal(i, decimal, precision); + } + } + } else { + throw new RuntimeException("internal error: " + this.toString()); + } --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863577 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java --- @@ -91,6 +108,8 @@ public double getDouble(int rowId) { } } + + --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863580 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java --- @@ -42,10 +43,26 @@ private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter this.converter = converter; } + private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter, + ColumnVectorInfo vectorInfo) { + super(columnPage.getColumnPageEncoderMeta(), columnPage.getPageSize()); + this.columnPage = columnPage; + this.converter = converter; + if (columnPage instanceof DecimalColumnPage) { + vectorInfo.decimalConverter = ((DecimalColumnPage) columnPage).getDecimalConverter(); + } + converter.decodeAndFillVector(columnPage, vectorInfo); + } + public static ColumnPage newPage(ColumnPage columnPage, ColumnPageValueConverter codec) { return new LazyColumnPage(columnPage, codec); } + public static ColumnPage newPage(ColumnPage columnPage, ColumnPageValueConverter codec, + ColumnVectorInfo vectorInfo) { + return new LazyColumnPage(columnPage, codec, vectorInfo); + } --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863584 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java --- @@ -30,9 +35,52 @@ */ private int columnValueSize; - public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) { + private int numOfRows; + + public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize, + int numOfRows) { super(isInvertedIndex); this.columnValueSize = columnValueSize; + this.numOfRows = numOfRows; + } + + @Override + public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data, + ColumnVectorInfo vectorInfo) { + CarbonColumnVector vector = vectorInfo.vector; + fillVector(data, vectorInfo, vector); + } + + private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) { + DataType dataType = vectorInfo.vector.getBlockDataType(); + if (dataType == DataTypes.DATE) { + for (int i = 0; i < numOfRows; i++) { + int surrogateInternal = + CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize); + if (surrogateInternal == 1) { + vector.putNull(i); + } else { + vector.putInt(i, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate); + } + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < numOfRows; i++) { + int surrogateInternal = + CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize); + if (surrogateInternal == 1) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863585 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java --- @@ -30,9 +35,52 @@ */ private int columnValueSize; - public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) { + private int numOfRows; + + public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize, + int numOfRows) { super(isInvertedIndex); this.columnValueSize = columnValueSize; + this.numOfRows = numOfRows; + } + + @Override + public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data, + ColumnVectorInfo vectorInfo) { + CarbonColumnVector vector = vectorInfo.vector; + fillVector(data, vectorInfo, vector); + } + + private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) { + DataType dataType = vectorInfo.vector.getBlockDataType(); + if (dataType == DataTypes.DATE) { + for (int i = 0; i < numOfRows; i++) { + int surrogateInternal = + CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize); + if (surrogateInternal == 1) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863591 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.chunk.store.impl.safe; + +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.DataTypeUtil; + +public abstract class AbstractNonDictionaryVectorFiller { + + protected int lengthSize; + protected int numberOfRows; + + public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) { + this.lengthSize = lengthSize; + this.numberOfRows = numberOfRows; + } + + public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer); + + public int getLengthFromBuffer(ByteBuffer buffer) { + return buffer.getShort(); + } +} + +class NonDictionaryVectorFillerFactory { + + public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize, + int numberOfRows) { + if (type == DataTypes.STRING || type == DataTypes.VARCHAR) { + if (lengthSize == 2) { + return new StringVectorFiller(lengthSize, numberOfRows); + } else { + return new LongStringVectorFiller(lengthSize, numberOfRows); + } + } else if (type == DataTypes.TIMESTAMP) { + return new TimeStampVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.BOOLEAN) { + return new BooleanVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.SHORT) { + return new ShortVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.INT) { + return new IntVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.LONG) { + return new LongStringVectorFiller(lengthSize, numberOfRows); + } + return new StringVectorFiller(lengthSize, numberOfRows); + } + +} + +class StringVectorFiller extends AbstractNonDictionaryVectorFiller { + + public StringVectorFiller(int lengthSize, int numberOfRows) { + super(lengthSize, numberOfRows); + } + + @Override + public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) { + // start position will be used to store the current data position + int startOffset = 0; + int currentOffset = lengthSize; + ByteUtil.UnsafeComparer comparer = ByteUtil.UnsafeComparer.INSTANCE; + for (int i = 0; i < numberOfRows - 1; i++) { + buffer.position(startOffset); --- End diff -- will add in another pr --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863599 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.chunk.store.impl.safe; + +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.DataTypeUtil; + +public abstract class AbstractNonDictionaryVectorFiller { + + protected int lengthSize; + protected int numberOfRows; + + public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) { + this.lengthSize = lengthSize; + this.numberOfRows = numberOfRows; + } + + public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer); --- End diff -- will add in another pr --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863615 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java --- @@ -49,6 +51,29 @@ public void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] dat this.dimensionDataChunkStore.putArray(invertedIndex, invertedIndexReverse, data); } + @Override + public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data, + ColumnVectorInfo vectorInfo) { + int columnValueSize = dimensionDataChunkStore.getColumnValueSize(); + int rowsNum = data.length / columnValueSize; + CarbonColumnVector vector = vectorInfo.vector; + if (!dictionary.isDictionaryUsed()) { + vector.setDictionary(dictionary); + dictionary.setDictionaryUsed(); + } + for (int i = 0; i < rowsNum; i++) { + int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize); + if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) { + vector.putNull(i); + vector.getDictionaryVector().putNull(i); + } else { + vector.putNotNull(i); + vector.getDictionaryVector().putInt(i, surrogate); --- End diff -- it is as per old code, will check feasible --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863618 --- Diff: integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.vectorreader; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + +import org.apache.spark.sql.CarbonVectorProxy; +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil; +import org.apache.spark.sql.types.Decimal; + +/** + * Fills the vector directly with out considering any deleted rows. + */ +class ColumnarVectorWrapperDirect implements CarbonColumnVector { + + protected CarbonVectorProxy.ColumnVectorProxy sparkColumnVectorProxy; + + protected CarbonVectorProxy carbonVectorProxy; --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863624 --- Diff: integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java --- @@ -290,12 +296,24 @@ public void initBatch(MemoryMode memMode, StructType partitionColumns, } } CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; - boolean[] filteredRows = new boolean[vectorProxy.numRows()]; - for (int i = 0; i < fields.length; i++) { - vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i); - if (isNoDictStringField[i]) { - if (vectors[i] instanceof ColumnarVectorWrapper) { - ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds(); + boolean[] filteredRows = null; + if (queryModel.isDirectVectorFill()) { + for (int i = 0; i < fields.length; i++) { + vectors[i] = new ColumnarVectorWrapperDirect(vectorProxy, i); + if (isNoDictStringField[i]) { + if (vectors[i] instanceof ColumnarVectorWrapperDirect) { --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863628 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java --- @@ -95,10 +99,24 @@ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryExce return LazyColumnPage.newPage(decodedPage, converter); } + @Override + public ColumnPage decodeAndFillVector(byte[] input, int offset, int length, --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863633 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java --- @@ -178,6 +196,143 @@ public double decodeDouble(float value) { public double decodeDouble(double value) { return value; } + + @Override public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) { + CarbonColumnVector vector = vectorInfo.vector; + BitSet nullBits = columnPage.getNullBits(); + DataType dataType = vector.getType(); + DataType type = columnPage.getDataType(); + int pageSize = columnPage.getPageSize(); + BitSet deletedRows = vectorInfo.deletedRows; + fillVector(columnPage, vector, dataType, type, pageSize, vectorInfo); + if (deletedRows == null || deletedRows.isEmpty()) { + for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) { + vector.putNull(i); + } + } + } + + private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType dataType, + DataType type, int pageSize, ColumnVectorInfo vectorInfo) { + if (type == DataTypes.BOOLEAN || type == DataTypes.BYTE) { + byte[] byteData = columnPage.getByteData(); + if (dataType == DataTypes.SHORT) { + for (int i = 0; i < pageSize; i++) { + vector.putShort(i, (short) byteData[i]); + } + } else if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) byteData[i]); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, byteData[i]); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, byteData[i] * 1000); + } + } else if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { + vector.putBytes(0, pageSize, byteData, 0); + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits()); + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, byteData[i]); + } + } + } else if (type == DataTypes.SHORT) { + short[] shortData = columnPage.getShortData(); + if (dataType == DataTypes.SHORT) { + vector.putShorts(0, pageSize, shortData, 0); + } else if (dataType == DataTypes.INT) { + for (int i = 0; i < pageSize; i++) { + vector.putInt(i, (int) shortData[i]); + } + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, shortData[i]); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, shortData[i] * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits()); + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, shortData[i]); + } + } + + } else if (type == DataTypes.SHORT_INT) { + int[] shortIntData = columnPage.getShortIntData(); + if (dataType == DataTypes.INT) { + vector.putInts(0, pageSize, shortIntData, 0); + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, shortIntData[i]); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, shortIntData[i] * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits()); + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, shortIntData[i]); + } + } + } else if (type == DataTypes.INT) { + int[] intData = columnPage.getIntData(); + if (dataType == DataTypes.INT) { + vector.putInts(0, pageSize, intData, 0); + } else if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, intData[i]); + } + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, intData[i] * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits()); + } else { + for (int i = 0; i < pageSize; i++) { + vector.putDouble(i, intData[i]); + } + } + } else if (type == DataTypes.LONG) { + long[] longData = columnPage.getLongData(); + if (dataType == DataTypes.LONG) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, longData[i]); + } + vector.putLongs(0, pageSize, longData, 0); + } else if (dataType == DataTypes.TIMESTAMP) { + for (int i = 0; i < pageSize; i++) { + vector.putLong(i, longData[i] * 1000); + } + } else if (DataTypes.isDecimal(dataType)) { + DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; + decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits()); + } + } else if (DataTypes.isDecimal(type)) { + if (DataTypes.isDecimal(dataType)) { --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863637 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java --- @@ -178,6 +196,143 @@ public double decodeDouble(float value) { public double decodeDouble(double value) { return value; } + + @Override public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) { + CarbonColumnVector vector = vectorInfo.vector; + BitSet nullBits = columnPage.getNullBits(); + DataType dataType = vector.getType(); + DataType type = columnPage.getDataType(); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863640 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java --- @@ -46,10 +46,37 @@ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex, dataChunk.length; dataChunkStore = DimensionChunkStoreFactory.INSTANCE .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize, - DimensionStoreType.FIXED_LENGTH, null); + DimensionStoreType.FIXED_LENGTH, null, false); dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk); } + /** + * Constructor + * + * @param dataChunk data chunk + * @param invertedIndex inverted index + * @param invertedIndexReverse reverse inverted index + * @param numberOfRows number of rows + * @param columnValueSize size of each column value + * @param vectorInfo vector to be filled with decoded column page. + */ + public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863645 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java --- @@ -29,6 +31,12 @@ */ ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException; + /** + * Apply decoding algorithm on input byte array and fill the vector here. + */ + ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863651 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java --- @@ -91,6 +93,25 @@ public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse } } + @Override + public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data, + ColumnVectorInfo vectorInfo) { + this.invertedIndexReverse = invertedIndex; + + // as first position will be start from 2 byte as data is stored first in the memory block + // we need to skip first two bytes this is because first two bytes will be length of the data + // which we have to skip + int lengthSize = getLengthSize(); + // creating a byte buffer which will wrap the length of the row + CarbonColumnVector vector = vectorInfo.vector; + DataType dt = vector.getType(); + ByteBuffer buffer = ByteBuffer.wrap(data); + BitSet deletedRows = vectorInfo.deletedRows; --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2819#discussion_r226863654 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.chunk.store.impl.safe; + +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.DataTypeUtil; + +public abstract class AbstractNonDictionaryVectorFiller { + + protected int lengthSize; + protected int numberOfRows; + + public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) { + this.lengthSize = lengthSize; + this.numberOfRows = numberOfRows; + } + + public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer); + + public int getLengthFromBuffer(ByteBuffer buffer) { + return buffer.getShort(); + } +} + +class NonDictionaryVectorFillerFactory { + + public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize, + int numberOfRows) { + if (type == DataTypes.STRING || type == DataTypes.VARCHAR) { + if (lengthSize == 2) { + return new StringVectorFiller(lengthSize, numberOfRows); + } else { + return new LongStringVectorFiller(lengthSize, numberOfRows); + } + } else if (type == DataTypes.TIMESTAMP) { + return new TimeStampVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.BOOLEAN) { + return new BooleanVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.SHORT) { + return new ShortVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.INT) { + return new IntVectorFiller(lengthSize, numberOfRows); + } else if (type == DataTypes.LONG) { + return new LongStringVectorFiller(lengthSize, numberOfRows); --- End diff -- ok --- |
Free forum by Nabble | Edit this page |