Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105852481 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; + +/** + * This class will provide the result collector instance based on the required type + */ +public class ResultCollectorFactory { + + /** + * logger of result collector factory + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(ResultCollectorFactory.class.getName()); + + /** + * This method will create result collector based on the given type + * + * @param blockExecutionInfo + * @return + */ + public static AbstractScannedResultCollector getScannedResultCollector( + BlockExecutionInfo blockExecutionInfo) { + AbstractScannedResultCollector scannerResultAggregator = null; + if (blockExecutionInfo.isRawRecordDetailQuery()) { + if (blockExecutionInfo.isRestructuredBlock()) { + LOGGER.info("Restructure based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RestructureBasedRawResultCollector(blockExecutionInfo); + } else { + LOGGER.info("Row based raw collector is used to scan and collect the data"); + scannerResultAggregator = new RawBasedResultCollector(blockExecutionInfo); + } + } else if (blockExecutionInfo.isVectorBatchCollector()) { + if (blockExecutionInfo.isRestructuredBlock()) { + LOGGER.info("Restructure dictionary vector collector is used to scan and collect the data"); + scannerResultAggregator = new RestructureBasedVectorResultCollector(blockExecutionInfo); --- End diff -- RestructureBasedVectorResultCollector should be derived from DictionaryBasedVectorResultCollector and RestructureBasedDictionaryResultCollector should be derived from DictionaryBasedResultCollector RestructureBasedRawResultCollector should be derived from RawBasedResultCollector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105870912 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * It is not a collector it is just a scanned result holder. + */ +public class RestructureBasedRawResultCollector extends AbstractScannedResultCollector { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName()); + + /** + * Key generator which will form the mdKey according to latest schema + */ + private KeyGenerator restructuredKeyGenerator; + + /** + * Key generator for uncompressing current block values + */ + private KeyGenerator updatedCurrentBlockKeyGenerator; + + public RestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + initRestructuredKeyGenerator(); + initCurrentBlockKeyGenerator(); + } + + /** + * This method will create a new key generator for generating mdKey according to latest schema + */ + private void initRestructuredKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + List<Integer> updatedColumnCardinality = new ArrayList<>(queryDimensions.length); + List<Integer> updatedDimensionPartitioner = new ArrayList<>(queryDimensions.length); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int dimCounterInCurrentBlock = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + if (tableBlockExecutionInfos.getDimensionInfo().getDimensionExists()[i]) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]); + updatedColumnCardinality.add(segmentProperties + .getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]); + updatedDimensionPartitioner.add(segmentProperties + .getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]); + dimCounterInCurrentBlock++; + } else { + // partitioner index will be 1 every column will be in columnar format + updatedDimensionPartitioner.add(1); + // for direct dictionary 4 bytes need to be allocated else 1 + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + updatedColumnCardinality.add(Integer.MAX_VALUE); + } else { + // cardinality will be 2 will user has provided a default value + byte[] defaultValue = queryDimensions[i].getDimension().getDefaultValue(); + if (null != defaultValue) { + updatedColumnCardinality + .add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1); + } else { + updatedColumnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY); + } + } + } + } + } + if (!updatedColumnCardinality.isEmpty()) { + int[] latestColumnCardinality = ArrayUtils.toPrimitive( + updatedColumnCardinality.toArray(new Integer[updatedColumnCardinality.size()])); + int[] latestColumnPartitioner = ArrayUtils.toPrimitive( + updatedDimensionPartitioner.toArray(new Integer[updatedDimensionPartitioner.size()])); + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(latestColumnCardinality, latestColumnPartitioner); + restructuredKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will initialize the block key generator for the current block based on the + * dictionary columns present in the current block + */ + private void initCurrentBlockKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int[] updatedColumnCardinality = new int[dictionaryColumnBlockIndex.length]; + int[] updatedDimensionPartitioner = new int[dictionaryColumnBlockIndex.length]; + for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[i]); + updatedColumnCardinality[i] = + segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]; + updatedDimensionPartitioner[i] = + segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]; + } + if (dictionaryColumnBlockIndex.length > 0) { + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(updatedColumnCardinality, updatedDimensionPartitioner); + updatedCurrentBlockKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + List<Object[]> listBasedResult = new ArrayList<>(batchSize); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); + ByteArrayWrapper wrapper = null; + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = + scannedResult.getDeleteDeltaDataCache(); + // scan the record and add to list + int rowCounter = 0; + while (scannedResult.hasNext() && rowCounter < batchSize) { + byte[] dictionaryKeyArray = scannedResult.getDictionaryKeyArray(); + byte[][] noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray(); + byte[][] complexTypeKeyArray = scannedResult.getComplexTypeKeyArray(); + byte[] implicitColumnByteArray = scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + if (null != deleteDeltaDataCache && deleteDeltaDataCache + .contains(scannedResult.getCurrenrRowId())) { + continue; + } + Object[] row = new Object[1 + queryMeasures.length]; + wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray)); + wrapper.setNoDictionaryKeys(fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray)); + wrapper.setComplexTypesKeys(complexTypeKeyArray); + wrapper.setImplicitColumnByteArray(implicitColumnByteArray); + row[0] = wrapper; + fillMeasureData(row, 1, scannedResult); + listBasedResult.add(row); + rowCounter++; + } + return listBasedResult; + } + + /** + * This method will fill the dictionary key array with newly added dictionary columns if any + * + * @param dictionaryKeyArray + * @return + */ + private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) { + QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + List<Long> keyArrayWithNewColumnValues = new ArrayList<>(actualQueryDimensions.length); + long[] keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray); + int existingColumnKeyArrayIndex = 0; + for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) { + if (CarbonUtil + .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) { + // if dimension exists then add the key array value else add the default value + if (dimensionInfo.getDimensionExists()[i]) { + keyArrayWithNewColumnValues.add(keyArray[existingColumnKeyArrayIndex++]); + } else { + Long defaultValueAsLong = null; + Object defaultValue = dimensionInfo.getDefaultValues()[i]; + if (null != defaultValue) { + defaultValueAsLong = ((Integer) defaultValue).longValue(); + } else { + defaultValueAsLong = + new Integer(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY).longValue(); + } + keyArrayWithNewColumnValues.add(defaultValueAsLong); + } + } + } + if (!keyArrayWithNewColumnValues.isEmpty()) { + long[] keyArrayWithLatestSchema = ArrayUtils.toPrimitive( + keyArrayWithNewColumnValues.toArray(new Long[keyArrayWithNewColumnValues.size()])); + try { + dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithLatestSchema); + } catch (KeyGenException e) { + LOGGER.error(e, e.getMessage()); + } + } + return dictionaryKeyArray; + } + + /** + * This method will fill the no dictionary byte array with newly added no dictionary columns + * + * @param noDictionaryKeyArray + * @return + */ + private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] noDictionaryKeyArray) { + QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + List<byte[]> noDictionaryValueList = new ArrayList<>(actualQueryDimensions.length); --- End diff -- We have to use Array instead of List and fill one time by looping one time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105895041 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java --- @@ -420,6 +438,54 @@ private static void getChildDimensionDictionaryDetail(CarbonDimension queryDimen } /** + * This method will create the updated list of filter measures present in the current block + * + * @param queryFilterMeasures + * @param currentBlockMeasures + * @return + */ + public static Set<CarbonMeasure> getUpdatedFilterMeasures(Set<CarbonMeasure> queryFilterMeasures, --- End diff -- Move this function to Abstract Query Executor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105868970 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * It is not a collector it is just a scanned result holder. + */ +public class RestructureBasedRawResultCollector extends AbstractScannedResultCollector { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName()); + + /** + * Key generator which will form the mdKey according to latest schema + */ + private KeyGenerator restructuredKeyGenerator; + + /** + * Key generator for uncompressing current block values + */ + private KeyGenerator updatedCurrentBlockKeyGenerator; + + public RestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + initRestructuredKeyGenerator(); + initCurrentBlockKeyGenerator(); + } + + /** + * This method will create a new key generator for generating mdKey according to latest schema + */ + private void initRestructuredKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + List<Integer> updatedColumnCardinality = new ArrayList<>(queryDimensions.length); + List<Integer> updatedDimensionPartitioner = new ArrayList<>(queryDimensions.length); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int dimCounterInCurrentBlock = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + if (tableBlockExecutionInfos.getDimensionInfo().getDimensionExists()[i]) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]); + updatedColumnCardinality.add(segmentProperties + .getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]); + updatedDimensionPartitioner.add(segmentProperties + .getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]); + dimCounterInCurrentBlock++; + } else { + // partitioner index will be 1 every column will be in columnar format + updatedDimensionPartitioner.add(1); + // for direct dictionary 4 bytes need to be allocated else 1 + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + updatedColumnCardinality.add(Integer.MAX_VALUE); + } else { + // cardinality will be 2 will user has provided a default value + byte[] defaultValue = queryDimensions[i].getDimension().getDefaultValue(); + if (null != defaultValue) { + updatedColumnCardinality + .add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1); + } else { + updatedColumnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY); + } + } + } + } + } + if (!updatedColumnCardinality.isEmpty()) { + int[] latestColumnCardinality = ArrayUtils.toPrimitive( + updatedColumnCardinality.toArray(new Integer[updatedColumnCardinality.size()])); + int[] latestColumnPartitioner = ArrayUtils.toPrimitive( + updatedDimensionPartitioner.toArray(new Integer[updatedDimensionPartitioner.size()])); + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(latestColumnCardinality, latestColumnPartitioner); + restructuredKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will initialize the block key generator for the current block based on the + * dictionary columns present in the current block + */ + private void initCurrentBlockKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int[] updatedColumnCardinality = new int[dictionaryColumnBlockIndex.length]; + int[] updatedDimensionPartitioner = new int[dictionaryColumnBlockIndex.length]; + for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[i]); + updatedColumnCardinality[i] = + segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]; + updatedDimensionPartitioner[i] = + segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]; + } + if (dictionaryColumnBlockIndex.length > 0) { + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(updatedColumnCardinality, updatedDimensionPartitioner); + updatedCurrentBlockKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + List<Object[]> listBasedResult = new ArrayList<>(batchSize); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); + ByteArrayWrapper wrapper = null; + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = + scannedResult.getDeleteDeltaDataCache(); + // scan the record and add to list + int rowCounter = 0; + while (scannedResult.hasNext() && rowCounter < batchSize) { + byte[] dictionaryKeyArray = scannedResult.getDictionaryKeyArray(); + byte[][] noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray(); + byte[][] complexTypeKeyArray = scannedResult.getComplexTypeKeyArray(); + byte[] implicitColumnByteArray = scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + if (null != deleteDeltaDataCache && deleteDeltaDataCache + .contains(scannedResult.getCurrenrRowId())) { + continue; + } + Object[] row = new Object[1 + queryMeasures.length]; + wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray)); + wrapper.setNoDictionaryKeys(fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray)); + wrapper.setComplexTypesKeys(complexTypeKeyArray); + wrapper.setImplicitColumnByteArray(implicitColumnByteArray); + row[0] = wrapper; + fillMeasureData(row, 1, scannedResult); + listBasedResult.add(row); + rowCounter++; + } + return listBasedResult; + } + + /** + * This method will fill the dictionary key array with newly added dictionary columns if any + * + * @param dictionaryKeyArray + * @return + */ + private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) { --- End diff -- Add unsafe Keygenerator and convert the below logic based on unsafe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105867937 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * It is not a collector it is just a scanned result holder. + */ +public class RestructureBasedRawResultCollector extends AbstractScannedResultCollector { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RestructureBasedRawResultCollector.class.getName()); + + /** + * Key generator which will form the mdKey according to latest schema + */ + private KeyGenerator restructuredKeyGenerator; + + /** + * Key generator for uncompressing current block values + */ + private KeyGenerator updatedCurrentBlockKeyGenerator; + + public RestructureBasedRawResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + initRestructuredKeyGenerator(); + initCurrentBlockKeyGenerator(); + } + + /** + * This method will create a new key generator for generating mdKey according to latest schema + */ + private void initRestructuredKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + List<Integer> updatedColumnCardinality = new ArrayList<>(queryDimensions.length); + List<Integer> updatedDimensionPartitioner = new ArrayList<>(queryDimensions.length); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int dimCounterInCurrentBlock = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + if (tableBlockExecutionInfos.getDimensionInfo().getDimensionExists()[i]) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]); + updatedColumnCardinality.add(segmentProperties + .getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]); + updatedDimensionPartitioner.add(segmentProperties + .getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]); + dimCounterInCurrentBlock++; + } else { + // partitioner index will be 1 every column will be in columnar format + updatedDimensionPartitioner.add(1); + // for direct dictionary 4 bytes need to be allocated else 1 + if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + updatedColumnCardinality.add(Integer.MAX_VALUE); + } else { + // cardinality will be 2 will user has provided a default value + byte[] defaultValue = queryDimensions[i].getDimension().getDefaultValue(); + if (null != defaultValue) { + updatedColumnCardinality + .add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1); + } else { + updatedColumnCardinality.add(CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY); + } + } + } + } + } + if (!updatedColumnCardinality.isEmpty()) { + int[] latestColumnCardinality = ArrayUtils.toPrimitive( + updatedColumnCardinality.toArray(new Integer[updatedColumnCardinality.size()])); + int[] latestColumnPartitioner = ArrayUtils.toPrimitive( + updatedDimensionPartitioner.toArray(new Integer[updatedDimensionPartitioner.size()])); + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(latestColumnCardinality, latestColumnPartitioner); + restructuredKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will initialize the block key generator for the current block based on the + * dictionary columns present in the current block + */ + private void initCurrentBlockKeyGenerator() { + SegmentProperties segmentProperties = + tableBlockExecutionInfos.getDataBlock().getSegmentProperties(); + int[] dictionaryColumnBlockIndex = tableBlockExecutionInfos.getDictionaryColumnBlockIndex(); + int[] updatedColumnCardinality = new int[dictionaryColumnBlockIndex.length]; + int[] updatedDimensionPartitioner = new int[dictionaryColumnBlockIndex.length]; + for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) { + // get the dictionary key ordinal as column cardinality in segment properties + // will only be for dictionary encoded columns + CarbonDimension currentBlockDimension = segmentProperties.getDimensions() + .get(dictionaryColumnBlockIndex[i]); + updatedColumnCardinality[i] = + segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]; + updatedDimensionPartitioner[i] = + segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]; + } + if (dictionaryColumnBlockIndex.length > 0) { + int[] dimensionBitLength = + CarbonUtil.getDimensionBitLength(updatedColumnCardinality, updatedDimensionPartitioner); + updatedCurrentBlockKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength); + } + } + + /** + * This method will add a record both key and value to list object + * it will keep track of how many record is processed, to handle limit scenario + */ + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + List<Object[]> listBasedResult = new ArrayList<>(batchSize); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); + ByteArrayWrapper wrapper = null; + BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = + scannedResult.getDeleteDeltaDataCache(); + // scan the record and add to list + int rowCounter = 0; + while (scannedResult.hasNext() && rowCounter < batchSize) { + byte[] dictionaryKeyArray = scannedResult.getDictionaryKeyArray(); + byte[][] noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray(); + byte[][] complexTypeKeyArray = scannedResult.getComplexTypeKeyArray(); + byte[] implicitColumnByteArray = scannedResult.getBlockletId() + .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + if (null != deleteDeltaDataCache && deleteDeltaDataCache + .contains(scannedResult.getCurrenrRowId())) { + continue; + } + Object[] row = new Object[1 + queryMeasures.length]; + wrapper = new ByteArrayWrapper(); + wrapper.setDictionaryKey(fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray)); --- End diff -- Call fillDictionaryKeyArrayWithLatestSchema, only if any change in corresponding byte[]. Same with fillNoDictionaryKeyArrayWithLatestSchema --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105880005 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.collector.impl; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor; + +import org.apache.spark.sql.types.Decimal; + +/** + * It is not a collector it is just a scanned result holder. + */ +public class RestructureBasedVectorResultCollector extends AbstractScannedResultCollector { + + private ColumnVectorInfo[] dictionaryInfo; + + private ColumnVectorInfo[] noDictionaryInfo; + + private ColumnVectorInfo[] complexInfo; + + private ColumnVectorInfo[] measureColumnInfo; + + private ColumnVectorInfo[] allColumnInfo; + + public RestructureBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) { + super(blockExecutionInfos); + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions(); + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); + measureColumnInfo = new ColumnVectorInfo[queryMeasures.length]; + allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length]; + List<ColumnVectorInfo> dictInfoList = new ArrayList<>(); + List<ColumnVectorInfo> noDictInfoList = new ArrayList<>(); + List<ColumnVectorInfo> complexList = new ArrayList<>(); + int dimensionExistIndex = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if (!dimensionInfo.getDimensionExists()[i]) { + // add a dummy column vector result collector object + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + continue; + } + // get the current block dimension and fetch the required information from it + QueryDimension currentBlockDimension = + tableBlockExecutionInfos.getQueryDimensions()[dimensionExistIndex++]; + if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + noDictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = currentBlockDimension; + columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + dictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = currentBlockDimension; + columnVectorInfo.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(currentBlockDimension.getDimension().getDataType()); + columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else if (queryDimensions[i].getDimension().isComplex()) { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + complexList.add(columnVectorInfo); + columnVectorInfo.dimension = currentBlockDimension; + columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal(); + columnVectorInfo.genericQueryType = + tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } else { + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + dictInfoList.add(columnVectorInfo); + columnVectorInfo.dimension = currentBlockDimension; + columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal(); + allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo; + } + } + int measureExistIndex = 0; + for (int i = 0; i < queryMeasures.length; i++) { + if (!measureInfo.getMeasureExists()[i]) { + // add a dummy column vector result collector object + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo; + continue; + } + QueryMeasure currentBlockMeasure = + tableBlockExecutionInfos.getQueryMeasures()[measureExistIndex++]; + ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo(); + columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory + .getMeasureVectorFiller(currentBlockMeasure.getMeasure().getDataType()); + columnVectorInfo.ordinal = currentBlockMeasure.getMeasure().getOrdinal(); + columnVectorInfo.measure = currentBlockMeasure; + this.measureColumnInfo[i] = columnVectorInfo; + allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo; + } + dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]); + noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]); + complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]); + Arrays.sort(dictionaryInfo); + Arrays.sort(noDictionaryInfo); + Arrays.sort(complexInfo); + } + + @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + throw new UnsupportedOperationException("collectData is not supported here"); + } + + @Override public void collectVectorBatch(AbstractScannedResult scannedResult, + CarbonColumnarBatch columnarBatch) { + int numberOfPages = scannedResult.numberOfpages(); + while (scannedResult.getCurrentPageCounter() < numberOfPages) { + int currentPageRowCount = scannedResult.getCurrentPageRowCount(); + if (currentPageRowCount == 0) { + scannedResult.incrementPageCounter(); + continue; + } + int rowCounter = scannedResult.getRowCounter(); + int availableRows = currentPageRowCount - rowCounter; + int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize(); + requiredRows = Math.min(requiredRows, availableRows); + if (requiredRows < 1) { + return; + } + for (int i = 0; i < allColumnInfo.length; i++) { + allColumnInfo[i].size = requiredRows; + allColumnInfo[i].offset = rowCounter; + allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter(); + allColumnInfo[i].vector = columnarBatch.columnVectors[i]; + } + + scannedResult.fillColumnarDictionaryBatch(dictionaryInfo); + scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo); + scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals()); + scannedResult.fillColumnarComplexBatch(complexInfo); + // fill default values for non existing dimensions and measures + fillDataForNonExistingDimensions(); + fillDataForNonExistingMeasures(); + // it means fetched all data out of page so increment the page counter + if (availableRows == requiredRows) { + scannedResult.incrementPageCounter(); + } else { + // Or set the row counter. + scannedResult.setRowCounter(rowCounter + requiredRows); + } + columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows); + columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows); + } + } + + /** + * This method will fill the default values of non existing dimensions in the current block + */ + private void fillDataForNonExistingDimensions() { + for (int i = 0; i < tableBlockExecutionInfos.getActualQueryDimensions().length; i++) { + if (!dimensionInfo.getDimensionExists()[i]) { + CarbonDimension dimension = + tableBlockExecutionInfos.getActualQueryDimensions()[i].getDimension(); + if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + // fill direct dictionary column data + fillDirectDictionaryData(allColumnInfo[i].vector, allColumnInfo[i], + dimensionInfo.getDefaultValues()[i]); + } else if (dimension.hasEncoding(Encoding.DICTIONARY)) { + // fill dictionary column data + fillDictionaryData(allColumnInfo[i].vector, allColumnInfo[i], + dimensionInfo.getDefaultValues()[i]); + } else { + // fill no dictionary data + fillNoDictionaryData(allColumnInfo[i].vector, allColumnInfo[i], + dimension.getDefaultValue()); + } + } + } + } + + /** + * This method will fill the dictionary column data + * + * @param vector + * @param columnVectorInfo + * + * @param defaultValue + */ + private void fillDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo, + Object defaultValue) { + int offset = columnVectorInfo.offset; + int vectorOffset = columnVectorInfo.vectorOffset; + int len = columnVectorInfo.size + offset; + for (int j = offset; j < len; j++) { + vector.putInt(vectorOffset++, (int) defaultValue); + } + } + + /** + * This method will fill the direct dictionary column data + * + * @param vector + * @param columnVectorInfo + * @param defaultValue + */ + private void fillDirectDictionaryData(CarbonColumnVector vector, + ColumnVectorInfo columnVectorInfo, Object defaultValue) { + int offset = columnVectorInfo.offset; + int vectorOffset = columnVectorInfo.vectorOffset; + int len = columnVectorInfo.size + offset; + for (int j = offset; j < len; j++) { + if (null != defaultValue) { + vector.putLong(vectorOffset++, (long) defaultValue); + } else { + vector.putNull(vectorOffset++); + } + } + } + + /** + * This method will fill the no dictionary column data + * + * @param vector + * @param columnVectorInfo + * @param defaultValue + */ + private void fillNoDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo, + byte[] defaultValue) { + int offset = columnVectorInfo.offset; + int vectorOffset = columnVectorInfo.vectorOffset; + int len = columnVectorInfo.size + offset; + for (int j = offset; j < len; j++) { + if (null != defaultValue) { + vector.putBytes(vectorOffset++, defaultValue); + } else { + vector.putNull(vectorOffset++); + } + } + } + + /** + * This method will fill the default values of non existing measures in the current block + */ + private void fillDataForNonExistingMeasures() { + for (int i = 0; i < tableBlockExecutionInfos.getActualQueryMeasures().length; i++) { + if (!measureInfo.getMeasureExists()[i]) { + CarbonMeasure measure = tableBlockExecutionInfos.getActualQueryMeasures()[i].getMeasure(); + ColumnVectorInfo columnVectorInfo = allColumnInfo[i]; + CarbonColumnVector vector = allColumnInfo[i].vector; + int offset = columnVectorInfo.offset; + int len = offset + columnVectorInfo.size; + int vectorOffset = columnVectorInfo.vectorOffset; + // convert decimal default value to spark decimal type so that new object is not getting + // created for every row added + Object defaultValue = convertDecimalValue(measure, measureInfo.getDefaultValues()[i]); + for (int j = offset; j < len; j++) { --- End diff -- Add batch default value put of ColumnVector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105974874 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -245,6 +248,36 @@ public static DataType getDataType(String dataTypeStr) { } /** + * This method will convert the data according to its data type and perform a + * special handling for decimal data types + * + * @param dataInBytes + * @param dimension + * @return + */ + public static Object getDataBasedOnDataType(byte[] dataInBytes, CarbonDimension dimension) { --- End diff -- change the previous function to handle the logic getDataBasedOnDataType(dataInBytes, dimension.getDataType()); --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105972588 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -1520,6 +1572,69 @@ public static String getFormatFromProperty(DataType dataType) { } /** + * This method will delete the dictionary files for the given column IDs and + * clear the dictionary cache + * + * @param dictionaryColumns + * @param carbonTable + */ + public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns, --- End diff -- Create a class to manage Dictionary and move this method into that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105922838 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java --- @@ -571,6 +603,53 @@ public static DimColumnFilterInfo getFilterListForAllMembersRS(Expression expres } /** + * This method will check whether a default value for the non-existing column is present + * in the filter values list + * + * @param dimColumnEvaluatorInfo + * @return + */ + public static boolean isDimensionDefaultValuePresentInFilterValues( --- End diff -- Move this to RestructureFilterEvaluator abstract class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105969726 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -966,20 +974,22 @@ public static void clearBlockCache(List<AbstractIndex> dataBlocks) { List<Boolean> isDictionaryDimensions = new ArrayList<Boolean>(); Set<Integer> processedColumnGroup = new HashSet<Integer>(); for (CarbonDimension carbonDimension : tableDimensionList) { - List<CarbonDimension> childs = carbonDimension.getListOfChildDimensions(); - //assuming complex dimensions will always be atlast - if (null != childs && childs.size() > 0) { - break; - } - if (carbonDimension.isColumnar() && hasEncoding(carbonDimension.getEncoder(), - Encoding.DICTIONARY)) { - isDictionaryDimensions.add(true); - } else if (!carbonDimension.isColumnar()) { - if (processedColumnGroup.add(carbonDimension.columnGroupId())) { + if (!carbonDimension.isInvisible()) { --- End diff -- The list which is being created, should have two getters. one to get With invisible and other to get without invisible. Use required function accordingly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105913851 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java --- @@ -38,37 +52,174 @@ * table blocks in that case we need to select only those dimension out of * query dimension which is present in the current table block * + * @param blockExecutionInfo * @param queryDimensions * @param tableBlockDimensions + * @param tableComplexDimension * @return list of query dimension which is present in the table block */ - public static List<QueryDimension> getUpdatedQueryDimension(List<QueryDimension> queryDimensions, + public static List<QueryDimension> createDimensionInfoAndGetUpdatedQueryDimension( + BlockExecutionInfo blockExecutionInfo, List<QueryDimension> queryDimensions, List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension) { List<QueryDimension> presentDimension = new ArrayList<QueryDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + boolean[] isDimensionExists = new boolean[queryDimensions.size()]; + Object[] defaultValues = new Object[queryDimensions.size()]; // selecting only those dimension which is present in the query + int dimIndex = 0; for (QueryDimension queryDimension : queryDimensions) { if (queryDimension.getDimension().hasEncoding(Encoding.IMPLICIT)) { presentDimension.add(queryDimension); + isDimensionExists[dimIndex] = true; } else { for (CarbonDimension tableDimension : tableBlockDimensions) { - if (tableDimension.equals(queryDimension.getDimension())) { - presentDimension.add(queryDimension); + if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) { + QueryDimension currentBlockDimension = new QueryDimension(tableDimension.getColName()); + tableDimension.getColumnSchema() + .setDataType(queryDimension.getDimension().getDataType()); + tableDimension.getColumnSchema() + .setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision()); + tableDimension.getColumnSchema() + .setScale(queryDimension.getDimension().getColumnSchema().getScale()); + tableDimension.getColumnSchema() + .setDefaultValue(queryDimension.getDimension().getDefaultValue()); + currentBlockDimension.setDimension(tableDimension); + currentBlockDimension.setQueryOrder(queryDimension.getQueryOrder()); + presentDimension.add(currentBlockDimension); + isDimensionExists[dimIndex] = true; + break; } } + // add default value only in case query dimension is not found in the current block + if (!isDimensionExists[dimIndex]) { + defaultValues[dimIndex] = validateAndGetDefaultValue(queryDimension.getDimension()); + blockExecutionInfo.setRestructuredBlock(true); --- End diff -- This has to be set only if complex dimension also does not match --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105962095 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/NoDictionaryTypeVisitor.java --- @@ -45,7 +47,15 @@ public void populateFilterResolvedInfo(DimColumnResolvedFilterInfo visitableObj, DimColumnFilterInfo resolvedFilterObject = null; List<String> evaluateResultListFinal; try { - evaluateResultListFinal = metadata.getExpression().evaluate(null).getListAsString(); + ExpressionResult result = metadata.getExpression().evaluate(null); --- End diff -- if need to check only is null scenario, use expression.isInstanceOf[EqualToExpresson].isNull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r105936938 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java --- @@ -46,44 +52,71 @@ public RowLevelRangeGrtThanFiterExecuterImpl( super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties, null); this.filterRangeValues = filterRangeValues; + checkIfDefaultValueIsPresentInFilterList(); + } + + /** + * This method will check whether default value is present in the given filter values + */ + private void checkIfDefaultValueIsPresentInFilterList() { --- End diff -- Rename to if default value matches filter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106092052 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { case databaseName ~ tableName ~ limit => ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit) } + + protected lazy val alterTableModifyDataType: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~ + ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ { --- End diff -- Move this to SparkDDLParser --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106094811 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala --- @@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } +private[sql] case class AlterTableDataTypeChange( --- End diff -- Alter table related move to AlterTableCommands file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106091809 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { case databaseName ~ tableName ~ limit => ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit) } + + protected lazy val alterTableModifyDataType: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~ + ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ { + case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values => + // both the column names should be same + CommonUtil.validateColumnNames(columnName, columnNameCopy) --- End diff -- Directly check equalsIgnoreCase, no separate util function required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106091632 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { case databaseName ~ tableName ~ limit => ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit) } + + protected lazy val alterTableModifyDataType: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~ + ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ { --- End diff -- Use datatype rule of existing parser rules, I think rule name is PrimitiveType. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106090264 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala --- @@ -784,4 +786,105 @@ object GlobalDictionaryUtil { throw ex } } + + def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath, --- End diff -- Write comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106092997 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { case databaseName ~ tableName ~ limit => ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit) } + + protected lazy val alterTableModifyDataType: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~ + ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ { + case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values => + // both the column names should be same + CommonUtil.validateColumnNames(columnName, columnNameCopy) + val alterTableChangeDataTypeModel = + AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase, values), + convertDbNameToLowerCase(dbName), + table.toLowerCase, + columnName.toLowerCase, + columnNameCopy.toLowerCase) + AlterTableDataTypeChange(alterTableChangeDataTypeModel) + } + + protected lazy val alterTableAddColumns: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~ + (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { + case dbName ~ table ~ fields ~ tblProp => + fields.foreach{ f => + if (isComplexDimDictionaryExclude(f.dataType.get)) { + throw new MalformedCarbonCommandException( + s"Add column is unsupported for complex datatype column: ${f.column}") + } + } + val tableProps = if (tblProp.isDefined) { + // default value should not be converted to lower case + val tblProps = tblProp.get.map(f => if (f._1.toLowerCase.startsWith("default.value.")) { + f._1 -> f._2 + } else { + f._1 -> f._2.toLowerCase + }) + scala.collection.mutable.Map(tblProps: _*) + } else { + scala.collection.mutable.Map.empty[String, String] + } + + val tableModel = prepareTableModel (false, + convertDbNameToLowerCase(dbName), + table.toLowerCase, + fields.map(convertFieldNamesToLowercase), + Seq.empty, + tableProps, + None, + true) + + val alterTableAddColumnsModel = AlterTableAddColumnsModel(convertDbNameToLowerCase(dbName), + table, + tableProps, + tableModel.dimCols, + tableModel.msrCols, + tableModel.highcardinalitydims.getOrElse(Seq.empty)) + AlterTableAddColumns(alterTableAddColumnsModel) + } + + private def convertFieldNamesToLowercase(field: Field): Field = { + val name = field.column.toLowerCase + field.copy(column = name, name = Some(name)) + } + protected lazy val alterTableDropColumn: Parser[LogicalPlan] = --- End diff -- Add rule name columnlist, which can check duplicates and related validation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106090223 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala --- @@ -77,4 +77,40 @@ object DataTypeConverterUtil { case DataType.STRUCT => "struct" } } + + /** + * convert from wrapper to external data type + * + * @param dataType + * @return + */ + def convertToThriftDataType(dataType: String): org.apache.carbondata.format.DataType = { --- End diff -- Similar conversion function already would have exist, please try to reuse --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |