GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/3001 [Presto][Streaming] support presto read streaming table Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/carbondata presto_streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3001.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3001 ---- commit ba7129c79dfe4f40827c609aadd261ca5f19cb67 Author: QiangCai <qiangcai@...> Date: 2018-12-19T05:25:36Z presto read streaming table ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1844/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1847/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2052/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r242810517 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/streaming/CarbondataStreamPageSource.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.streaming; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.presto.CarbonDictionaryDecodeReadSupport; +import org.apache.carbondata.presto.CarbonVectorBatch; +import org.apache.carbondata.presto.CarbondataSplit; +import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; +import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; + +import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.block.Block; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.log4j.Logger; + +/** + * Carbondata Page Source class for custom Carbondata RecordSet Iteration. + */ +public class CarbondataStreamPageSource implements ConnectorPageSource { --- End diff -- Existing CarbondataPageSource only can be used ? why new pageSource ? we can have private AbstractRecordReader recordReader instead of PrestoCarbonVectorizedRecordReader --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10102/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2055/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r242812393 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/streaming/PrestoCarbonStreamRecordReader.java --- @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.streaming; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +public class PrestoCarbonStreamRecordReader extends RecordReader<Void, Object> { + + public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private DataType[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + private PrestoStreamBlockletReader input; + private boolean isFirstRow = true; + private QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + private String compressorName; + + // vectorized reader + private boolean isFinished = false; + + // filter + private FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + private RowIntf filterRow; + private int[] filterMap; + + // output + private CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + private Object[] outputValues; + + // empty project, null filter + private boolean skipScanData; + + public PrestoCarbonStreamRecordReader(QueryModel mdl) { + this.model = mdl; + + } + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.createQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int j = 0; j < projection.length; j++) { + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + // legacy store does not have this member + if (header.isSetCompressor_name()) { + compressorName = header.getCompressor_name(); + } else { + compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName(); + } + return header.getSync_marker(); + } + + private void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toString()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new PrestoStreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0, compressorName); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); + queryTypes = getComplexDimensions(carbonTable, storageColumns, cache); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + return nextRow(); --- End diff -- presto supports row by row processing ? I think we have to retain vectorRead flow not the row flow. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r243164638 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/streaming/CarbondataStreamPageSource.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.streaming; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.presto.CarbonDictionaryDecodeReadSupport; +import org.apache.carbondata.presto.CarbonVectorBatch; +import org.apache.carbondata.presto.CarbondataSplit; +import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; +import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; + +import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.block.Block; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.log4j.Logger; + +/** + * Carbondata Page Source class for custom Carbondata RecordSet Iteration. + */ +public class CarbondataStreamPageSource implements ConnectorPageSource { --- End diff -- The streaming files are row format, it is hard to lazy load column data for each column. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r243176527 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/streaming/CarbondataStreamPageSource.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.streaming; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.presto.CarbonDictionaryDecodeReadSupport; +import org.apache.carbondata.presto.CarbonVectorBatch; +import org.apache.carbondata.presto.CarbondataSplit; +import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; +import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; + +import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.Page; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.block.Block; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.log4j.Logger; + +/** + * Carbondata Page Source class for custom Carbondata RecordSet Iteration. + */ +public class CarbondataStreamPageSource implements ConnectorPageSource { --- End diff -- I already re-factory old record reader. now presto can reuse it. --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r243176905 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/streaming/PrestoCarbonStreamRecordReader.java --- @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.streaming; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +public class PrestoCarbonStreamRecordReader extends RecordReader<Void, Object> { + + public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private DataType[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + private PrestoStreamBlockletReader input; + private boolean isFirstRow = true; + private QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + private String compressorName; + + // vectorized reader + private boolean isFinished = false; + + // filter + private FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + private RowIntf filterRow; + private int[] filterMap; + + // output + private CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + private Object[] outputValues; + + // empty project, null filter + private boolean skipScanData; + + public PrestoCarbonStreamRecordReader(QueryModel mdl) { + this.model = mdl; + + } + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.createQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int j = 0; j < projection.length; j++) { + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + // legacy store does not have this member + if (header.isSetCompressor_name()) { + compressorName = header.getCompressor_name(); + } else { + compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName(); + } + return header.getSync_marker(); + } + + private void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toString()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new PrestoStreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0, compressorName); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); + queryTypes = getComplexDimensions(carbonTable, storageColumns, cache); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + return nextRow(); --- End diff -- The stream file is row format, it doesn't have vector flow. I already remove this file and just reuse old record reader. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1866/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10121/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2074/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1870/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2079/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10125/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3001#discussion_r243951568 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java --- @@ -95,6 +95,9 @@ private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType } else if (dataType == DataTypes.STRING) { return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock); } else if (DataTypes.isDecimal(dataType)) { + if (dictionary != null && dataType instanceof DecimalType) { --- End diff -- If inside DataTypes.isDecimal check means, it is Decimal type. So, why this change required ? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/3001 @QiangCai Can you explain the dependency of readers after this PR? Ideally, presto module should depends on streaming reader in core or hadoop modules. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/3001 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1962/ --- |
Free forum by Nabble | Edit this page |