Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563643 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the + * carbondata column APIs and fills the data directly into columns. + */ +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName()); + + private CarbonColumnarBatch carbonColumnarBatch; + + private QueryExecutor queryExecutor; + + private int batchIdx = 0; + + private int numBatched = 0; + + private AbstractDetailQueryResultIterator iterator; + + private QueryModel queryModel; + + public CarbonVectorizedRecordReader(QueryModel queryModel) { + this.queryModel = queryModel; + } + + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + List<CarbonInputSplit> splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + queryModel.setVectorReader(true); + try { + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); + iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); + } catch (QueryExecutionException e) { + LOGGER.error(e); + throw new InterruptedException(e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw e; + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + initBatch(); + if (batchIdx >= numBatched) { + if (!nextBatch()) return false; + } + ++batchIdx; + return true; + } + + + private boolean nextBatch() { + carbonColumnarBatch.reset(); + if (iterator.hasNext()) { + iterator.processNextBatch(carbonColumnarBatch); + numBatched = carbonColumnarBatch.getActualSize(); + batchIdx = 0; + return true; + } + return false; + } + + private void initBatch() { + if (carbonColumnarBatch == null) { + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); + StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; + for (ProjectionDimension dim : queryDimension) { + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); + } + for (ProjectionMeasure msr : queryMeasures) { + DataType dataType = msr.getMeasure().getDataType(); + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT + || dataType == DataTypes.INT || dataType == DataTypes.LONG + || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else if (DataTypes.isDecimal(dataType)) { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), + new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale())); + } else { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); + } + } + CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new CarbonColumnVectorImpl( + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + fields[i].getDataType()); + } + carbonColumnarBatch = new CarbonColumnarBatch(vectors, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + new boolean[] {}); + } + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + rowCount += 1; + Object[] row = new Object[carbonColumnarBatch.columnVectors.length]; + for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) { + if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING + || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) { + byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + row[i] = ByteUtil.toString(data, 0, data.length); + } else { + row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + } + } + return row; + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; --- End diff -- VectorizedCarbonRecordReader is handled in the same way --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563650 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the + * carbondata column APIs and fills the data directly into columns. + */ +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName()); + + private CarbonColumnarBatch carbonColumnarBatch; + + private QueryExecutor queryExecutor; + + private int batchIdx = 0; + + private int numBatched = 0; + + private AbstractDetailQueryResultIterator iterator; + + private QueryModel queryModel; + + public CarbonVectorizedRecordReader(QueryModel queryModel) { + this.queryModel = queryModel; + } + + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + List<CarbonInputSplit> splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + queryModel.setVectorReader(true); + try { + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); + iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); + } catch (QueryExecutionException e) { + LOGGER.error(e); + throw new InterruptedException(e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw e; + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + initBatch(); --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563660 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java --- @@ -1737,4 +1738,89 @@ public void testReadNextRowWithProjectionAndRowUtil() { } } + @Test + public void testVectorReader() { + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[12]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("shortField", DataTypes.SHORT); + fields[2] = new Field("intField", DataTypes.INT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + fields[9] = new Field("varcharField", DataTypes.VARCHAR); + fields[10] = new Field("byteField", DataTypes.BYTE); + fields[11] = new Field("floatField", DataTypes.FLOAT); + Map<String, String> map = new HashMap<>(); + map.put("complex_delimiter_level_1", "#"); + CarbonWriter writer = CarbonWriter.builder() + .outputPath(path) + .withLoadOptions(map) + .withCsvInput(new Schema(fields)) + .writtenBy("CarbonReaderTest") + .build(); + + for (int i = 0; i < 10; i++) { + String[] row2 = new String[]{ + "robot" + (i % 10), + String.valueOf(i % 10000), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34", + "12.345", + "varchar", + String.valueOf(i), + "1.23" + }; + writer.write(row2); + } + writer.close(); + + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .withVectorReader(true) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] data = (Object[]) reader.readNextRow(); + + assert (RowUtil.getString(data, 0).equals("robot" + i)); + assertEquals(RowUtil.getShort(data, 4), i); + assertEquals(RowUtil.getInt(data, 5), i); + assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i); + assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2); + assert (RowUtil.getByte(data, 8).equals(new Byte("1"))); + assertEquals(RowUtil.getInt(data, 1), 17957); + assertEquals(RowUtil.getLong(data, 2), 1549920814000000L); + assert (RowUtil.getDecimal(data, 9).equals("12.35")); + assert (RowUtil.getString(data, 3).equals("varchar")); + assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i))); + assertEquals(RowUtil.getFloat(data, 11), new Float("1.23")); + i++; + } + reader.close(); --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563684 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -145,9 +158,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO externalTableSegments.add(seg); } } - // do block filtering and get split - List<InputSplit> splits = - getSplits(job, filter, externalTableSegments, null, partitionInfo, null); + List<InputSplit> splits = new ArrayList<>(); + if (isSDK) { --- End diff -- changed --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563694 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -145,9 +158,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO externalTableSegments.add(seg); } } - // do block filtering and get split - List<InputSplit> splits = - getSplits(job, filter, externalTableSegments, null, partitionInfo, null); + List<InputSplit> splits = new ArrayList<>(); + if (isSDK) { + for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563709 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java --- @@ -138,6 +138,19 @@ public CarbonInputSplit(String segmentId, Path path, long start, long length, St version = CarbonProperties.getInstance().getFormatVersion(); } + public CarbonInputSplit(String segmentId, Path path, long start, long length, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563795 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java --- @@ -305,7 +301,7 @@ public void setBlockDataType(DataType blockDataType) { } @Override public CarbonColumnVector getDictionaryVector() { - return dictionaryVector; + return null; --- End diff -- VectorizedCarbonRecordReader is handled in the same way for getCurrentKey(). --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1387/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9438/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1174/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1408/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1192/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9458/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1412/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1199/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9463/ --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/2869 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1200/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1415/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9464/ --- |
Free forum by Nabble | Edit this page |