GitHub user manishgupta88 opened a pull request:
https://github.com/apache/carbondata/pull/2437 [WIP] segregate block and blocklet cache segregate block and blocklet cache - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/carbondata block_blocklet_cache Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2437.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2437 ---- commit 8c8215da55a687f4debadd56fa9d5356729d3331 Author: manishgupta88 <tomanishgupta18@...> Date: 2018-06-25T06:43:00Z segregate block and blocklet cache ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6718/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5545/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5567/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5558/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6728/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5568/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5580/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6738/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200149538 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java --- @@ -26,70 +26,66 @@ */ public abstract class DataMapRow implements Serializable { - protected CarbonRowSchema[] schemas; --- End diff -- I don't think it is required to remove the schema from here as it is a temporary object to read data. It does not occupy any heap memory. Many changes related to this is not required. Please revert the changes --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200149739 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java --- @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; --- End diff -- use `import java.io.*` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200155521 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java --- @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.SafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.indexstore.schema.SchemaGenerator; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.profiler.ExplainCollector; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.BlockletDataMapUtil; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.xerial.snappy.Snappy; + +/** + * Datamap implementation for blocklet. + */ +public class BlockDataMap extends CoarseGrainDataMap implements Serializable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockDataMap.class.getName()); + + protected static final long serialVersionUID = -2170289352240810993L; + /** + * for CACHE_LEVEL=BLOCK and legacy store default blocklet id will be -1 + */ + private static final short BLOCK_DEFAULT_BLOCKLET_ID = -1; + + protected AbstractMemoryDMStore memoryDMStore; + + protected AbstractMemoryDMStore taskSummaryDMStore; + + // As it is a heavy object it is not recommended to serialize this object + protected transient SegmentProperties segmentProperties; + + protected int[] columnCardinality; + + protected long blockletSchemaTime; + /** + * flag to check for store from 1.1 or any prior version + */ + protected boolean isLegacyStore; + + @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { + long startTime = System.currentTimeMillis(); + assert (dataMapModel instanceof BlockletDataMapModel); + BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel; + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List<DataFileFooter> indexInfo = fileFooterConverter + .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData()); + Path path = new Path(blockletDataMapInfo.getFilePath()); + byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] segmentId = + blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] schemaBinary = null; + if (!indexInfo.isEmpty()) { + DataFileFooter fileFooter = indexInfo.get(0); + // store for 1.1 or any prior version will not have any blocklet information in file footer + isLegacyStore = fileFooter.getBlockletList() == null; + // init segment properties and create schema + initSegmentProperties(fileFooter); + schemaBinary = convertSchemaToBinary(fileFooter.getColumnInTable()); + createSchema(segmentProperties, blockletDataMapInfo.isAddToUnsafe(), true); + createSummarySchema(segmentProperties, schemaBinary, filePath, fileName, segmentId, + blockletDataMapInfo.isAddToUnsafe(), true); + } + // check for legacy store and load the metadata + DataMapRowImpl summaryRow = loadBlockMetadata(blockletDataMapInfo, indexInfo); --- End diff -- Create protected methods for `createSchema`, `createSummarySchema` and `loadMetadata` and override the same in BloclkletDataMap --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200156093 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -221,438 +159,71 @@ private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, CarbonRowSchema[] schema = memoryDMStore.getSchema(); // Add one row to maintain task level min max for segment pruning if (!blockletList.isEmpty() && summaryRow == null) { - summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); + summaryRow = new DataMapRowImpl(taskSummaryDMStore.getSchema().length); } for (int index = 0; index < blockletList.size(); index++) { - DataMapRow row = new DataMapRowImpl(schema); + DataMapRow row = new DataMapRowImpl(schema.length); int ordinal = 0; int taskMinMaxOrdinal = 0; BlockletInfo blockletInfo = blockletList.get(index); - - // add start key as index key - row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); - BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); - byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal); // compute and set task level min values - addTaskMinMaxValues(summaryRow, minMaxLen, - summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, - TASK_MIN_VALUES_INDEX, true); + addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal, + minMaxIndex.getMinValues(), BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; - byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal); // compute and set task level max values - addTaskMinMaxValues(summaryRow, minMaxLen, - summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, - TASK_MAX_VALUES_INDEX, false); + addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal, + minMaxIndex.getMaxValues(), BlockletDataMapRowIndexes.TASK_MAX_VALUES_INDEX, false); ordinal++; - row.setInt(blockletInfo.getNumberOfRows(), ordinal++); - // add file path byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); row.setByteArray(filePathBytes, ordinal++); - - // add pages - row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); - // add version number row.setShort(fileFooter.getVersionId().number(), ordinal++); - // add schema updated time row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); - - // add blocklet info byte[] serializedData; try { + // Add block footer offset, it is used if we need to read footer of block + row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); + setLocations(blockMetaInfo.getLocationInfo(), row, ordinal++); + // Store block size + row.setLong(blockMetaInfo.getSize(), ordinal++); + // add blocklet info ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutput dataOutput = new DataOutputStream(stream); blockletInfo.write(dataOutput); serializedData = stream.toByteArray(); row.setByteArray(serializedData, ordinal++); - // Add block footer offset, it is used if we need to read footer of block - row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); - setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); - ordinal++; - // for relative blockelt id i.e blocklet id that belongs to a particular part file - row.setShort((short) relativeBlockletId++, ordinal++); - // Store block size - row.setLong(blockMetaInfo.getSize(), ordinal); + // add pages + row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); + // for relative blocklet id i.e blocklet id that belongs to a particular carbondata file + row.setShort((short) relativeBlockletId++, ordinal); memoryDMStore.addIndexRow(row); } catch (Exception e) { throw new RuntimeException(e); } } - return summaryRow; } - private void setLocations(String[] locations, DataMapRow row, int ordinal) - throws UnsupportedEncodingException { - // Add location info - String locationStr = StringUtils.join(locations, ','); - row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal); - } - - /** - * Load information for the block.It is the case can happen only for old stores - * where blocklet information is not available in index file. So load only block information - * and read blocklet information in executor. - */ - private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter, - SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - BlockMetaInfo blockMetaInfo) { - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); - CarbonRowSchema[] schema = memoryDMStore.getSchema(); - // Add one row to maintain task level min max for segment pruning - if (summaryRow == null) { - summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); - } - DataMapRow row = new DataMapRowImpl(schema); - int ordinal = 0; - int taskMinMaxOrdinal = 0; - // add start key as index key - row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++); - - BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex(); - byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); - byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); - // update min max values in case of old store - byte[][] updatedMinValues = - CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true); - byte[][] updatedMaxValues = - CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false); - row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal); - // compute and set task level min values - addTaskMinMaxValues(summaryRow, minMaxLen, - summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, - TASK_MIN_VALUES_INDEX, true); - ordinal++; - taskMinMaxOrdinal++; - row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal); - // compute and set task level max values - addTaskMinMaxValues(summaryRow, minMaxLen, - summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, - TASK_MAX_VALUES_INDEX, false); - ordinal++; - - row.setInt((int)fileFooter.getNumberOfRows(), ordinal++); - - // add file path - byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); - row.setByteArray(filePathBytes, ordinal++); - - // add pages - row.setShort((short) 0, ordinal++); - - // add version number - row.setShort(fileFooter.getVersionId().number(), ordinal++); - - // add schema updated time - row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); - - // add blocklet info - row.setByteArray(new byte[0], ordinal++); - - row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); - try { - setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); - ordinal++; - // for relative blocklet id. Value is -1 because in case of old store blocklet info will - // not be present in the index file and in that case we will not knwo the total number of - // blocklets - row.setShort((short) -1, ordinal++); - - // store block size - row.setLong(blockMetaInfo.getSize(), ordinal); - memoryDMStore.addIndexRow(row); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return summaryRow; - } - - private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary, - byte[] filePath, byte[] fileName, byte[] segmentId) { - // write the task summary info to unsafe memory store - if (null != summaryRow) { - // Add column schema , it is useful to generate segment properties in executor. - // So we no need to read footer again there. - if (schemaBinary != null) { - summaryRow.setByteArray(schemaBinary, SCHEMA); - } - summaryRow.setByteArray(filePath, INDEX_PATH); - summaryRow.setByteArray(fileName, INDEX_FILE_NAME); - summaryRow.setByteArray(segmentId, SEGMENTID); - try { - summaryDMStore.addIndexRow(summaryRow); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - /** - * Fill the measures min values with minimum , this is needed for backward version compatability - * as older versions don't store min values for measures - */ - private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) { - byte[][] updatedValues = minValues; - if (minValues.length < minMaxLen.length) { - updatedValues = new byte[minMaxLen.length][]; - System.arraycopy(minValues, 0, updatedValues, 0, minValues.length); - List<CarbonMeasure> measures = segmentProperties.getMeasures(); - ByteBuffer buffer = ByteBuffer.allocate(8); - for (int i = 0; i < measures.size(); i++) { - buffer.rewind(); - DataType dataType = measures.get(i).getDataType(); - if (dataType == DataTypes.BYTE) { - buffer.putLong(Byte.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.SHORT) { - buffer.putLong(Short.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.INT) { - buffer.putLong(Integer.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.LONG) { - buffer.putLong(Long.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } else if (DataTypes.isDecimal(dataType)) { - updatedValues[minValues.length + i] = - DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE)); - } else { - buffer.putDouble(Double.MIN_VALUE); - updatedValues[minValues.length + i] = buffer.array().clone(); - } - } - } - return updatedValues; - } - - /** - * Fill the measures max values with maximum , this is needed for backward version compatability - * as older versions don't store max values for measures - */ - private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) { - byte[][] updatedValues = maxValues; - if (maxValues.length < minMaxLen.length) { - updatedValues = new byte[minMaxLen.length][]; - System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length); - List<CarbonMeasure> measures = segmentProperties.getMeasures(); - ByteBuffer buffer = ByteBuffer.allocate(8); - for (int i = 0; i < measures.size(); i++) { - buffer.rewind(); - DataType dataType = measures.get(i).getDataType(); - if (dataType == DataTypes.BYTE) { - buffer.putLong(Byte.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.SHORT) { - buffer.putLong(Short.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.INT) { - buffer.putLong(Integer.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (dataType == DataTypes.LONG) { - buffer.putLong(Long.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } else if (DataTypes.isDecimal(dataType)) { - updatedValues[maxValues.length + i] = - DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE)); - } else { - buffer.putDouble(Double.MAX_VALUE); - updatedValues[maxValues.length + i] = buffer.array().clone(); - } - } - } - return updatedValues; - } - - private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema, - byte[][] minValues) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); - DataMapRow minRow = new DataMapRowImpl(minSchemas); - int minOrdinal = 0; - // min value adding - for (int i = 0; i < minMaxLen.length; i++) { - minRow.setByteArray(minValues[i], minOrdinal++); - } - return minRow; - } - - /** - * This method will compute min/max values at task level - * - * @param taskMinMaxRow - * @param minMaxLen - * @param carbonRowSchema - * @param minMaxValue - * @param ordinal - * @param isMinValueComparison - */ - private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen, - CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal, - boolean isMinValueComparison) { - DataMapRow row = taskMinMaxRow.getRow(ordinal); - byte[][] updatedMinMaxValues = minMaxValue; - if (null == row) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); - row = new DataMapRowImpl(minSchemas); - } else { - byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal); - // Compare and update min max values - for (int i = 0; i < minMaxLen.length; i++) { - int compare = - ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]); - if (isMinValueComparison) { - if (compare < 0) { - updatedMinMaxValues[i] = existingMinMaxValues[i]; - } - } else if (compare > 0) { - updatedMinMaxValues[i] = existingMinMaxValues[i]; - } - } - } - int minMaxOrdinal = 0; - // min/max value adding - for (int i = 0; i < minMaxLen.length; i++) { - row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++); - } - taskMinMaxRow.setRow(row, ordinal); - } - - private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe) - throws MemoryException { - List<CarbonRowSchema> indexSchemas = new ArrayList<>(); - - // Index key - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - getMinMaxSchema(segmentProperties, indexSchemas); - - // for number of rows. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT)); - - // for table block path - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for number of pages. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for version number. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for schema updated time. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - //for blocklet info - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for block footer offset. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - // for locations - indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - - // for relative blocklet id i.e. blocklet id that belongs to a particular part file. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); - - // for storing block length. - indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - - CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]); - memoryDMStore = getMemoryDMStore(schema, addToUnsafe); - } - - /** - * Creates the schema to store summary information or the information which can be stored only - * once per datamap. It stores datamap level max/min of each column and partition information of - * datamap - * @param segmentProperties - * @throws MemoryException - */ - private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary, - byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe) - throws MemoryException { - List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(); - getMinMaxSchema(segmentProperties, taskMinMaxSchemas); - // for storing column schema - taskMinMaxSchemas.add( - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length)); - // for storing file path - taskMinMaxSchemas.add( - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length)); - // for storing file name - taskMinMaxSchemas.add( - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length)); - // for storing segmentid - taskMinMaxSchemas.add( - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length)); - CarbonRowSchema[] schema = - taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]); - summaryDMStore = getMemoryDMStore(schema, addToUnsafe); - } - - private void getMinMaxSchema(SegmentProperties segmentProperties, - List<CarbonRowSchema> minMaxSchemas) { - // Index key - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - // do it 2 times, one for min and one for max. - for (int k = 0; k < 2; k++) { - CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length]; - for (int i = 0; i < minMaxLen.length; i++) { - if (minMaxLen[i] <= 0) { - boolean isVarchar = false; - if (i < segmentProperties.getDimensions().size() - && segmentProperties.getDimensions().get(i).getDataType() == DataTypes.VARCHAR) { - isVarchar = true; - } - mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY, - isVarchar); - } else { - mapSchemas[i] = - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]); - } - } - CarbonRowSchema mapSchema = - new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), - mapSchemas); - minMaxSchemas.add(mapSchema); - } - } - - @Override - public boolean isScanRequired(FilterResolverIntf filterExp) { - FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - for (int i = 0; i < summaryDMStore.getRowCount(); i++) { - DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i); - boolean isScanRequired = FilterExpressionProcessor.isScanRequired( - filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), - getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); - if (isScanRequired) { - return true; - } - } - return false; - } - private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } List<Blocklet> blocklets = new ArrayList<>(); + CarbonRowSchema[] schema = memoryDMStore.getSchema(); --- End diff -- Use the prune from super class --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2437#discussion_r200157458 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +/** + * holder for blocklet info indexes in a DataMap row + */ +public class BlockletDataMapRowIndexes { --- End diff -- Change to interface --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5608/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2437 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5611/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5598/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6776/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2437 @ravipesala ..handled review comments. Kindly review and merge --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2437 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6796/ --- |
Free forum by Nabble | Edit this page |