Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229918994 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java --- @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() { assertEquals(RowUtil.getFloat(data, 11), (float) 1.23); i++; } + assert (i == 10); + reader.close(); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + + @Test + public void testVectorReader() { + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[12]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("shortField", DataTypes.SHORT); + fields[2] = new Field("intField", DataTypes.INT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + fields[9] = new Field("varcharField", DataTypes.VARCHAR); + fields[10] = new Field("byteField", DataTypes.BYTE); + fields[11] = new Field("floatField", DataTypes.FLOAT); + Map<String, String> map = new HashMap<>(); + map.put("complex_delimiter_level_1", "#"); + CarbonWriter writer = CarbonWriter.builder() + .outputPath(path) + .withLoadOptions(map) + .withCsvInput(new Schema(fields)) + .writtenBy("CarbonReaderTest") + .build(); + + for (int i = 0; i < 10; i++) { + String[] row2 = new String[]{ + "robot" + (i % 10), + String.valueOf(i % 10000), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34", + "12.345", + "varchar", + String.valueOf(i), + "1.23" + }; + writer.write(row2); + } + writer.close(); + + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .withVectorReader(true) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] data = (Object[]) reader.readNextRow(); + + assert (RowUtil.getString(data, 0).equals("robot" + i)); + assertEquals(RowUtil.getShort(data, 4), i); + assertEquals(RowUtil.getInt(data, 5), i); + assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i); + assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2); + assert (RowUtil.getByte(data, 8).equals(new Byte("1"))); + assertEquals(RowUtil.getInt(data, 1), 17957); + assertEquals(RowUtil.getLong(data, 2), 1549920814000000L); --- End diff -- timestamp value is different between local machine and CI machine. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230247332 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java --- @@ -524,6 +524,22 @@ public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory. return getFiles(listStatus); } + @Override public List<CarbonFile> listFiles(Boolean recursive, CarbonFileFilter fileFilter) --- End diff -- Move ` @Override` down and add doc for public method --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230247574 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java --- @@ -347,9 +347,7 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, columnPage.getNullBits()); } else if (vectorDataType == DataTypes.FLOAT) { float[] floatPage = columnPage.getFloatPage(); - for (int i = 0; i < pageSize; i++) { - vector.putFloats(0, pageSize, floatPage, 0); - } + vector.putFloats(0, pageSize, floatPage, 0); --- End diff -- Line numbers from 322 to 334 also remove as it is duplicated. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230251185 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java --- @@ -1737,4 +1738,89 @@ public void testReadNextRowWithProjectionAndRowUtil() { } } + @Test + public void testVectorReader() { + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[12]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("shortField", DataTypes.SHORT); + fields[2] = new Field("intField", DataTypes.INT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + fields[9] = new Field("varcharField", DataTypes.VARCHAR); + fields[10] = new Field("byteField", DataTypes.BYTE); + fields[11] = new Field("floatField", DataTypes.FLOAT); + Map<String, String> map = new HashMap<>(); + map.put("complex_delimiter_level_1", "#"); + CarbonWriter writer = CarbonWriter.builder() + .outputPath(path) + .withLoadOptions(map) + .withCsvInput(new Schema(fields)) + .writtenBy("CarbonReaderTest") + .build(); + + for (int i = 0; i < 10; i++) { + String[] row2 = new String[]{ + "robot" + (i % 10), + String.valueOf(i % 10000), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34", + "12.345", + "varchar", + String.valueOf(i), + "1.23" + }; + writer.write(row2); + } + writer.close(); + + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .withVectorReader(true) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] data = (Object[]) reader.readNextRow(); + + assert (RowUtil.getString(data, 0).equals("robot" + i)); + assertEquals(RowUtil.getShort(data, 4), i); + assertEquals(RowUtil.getInt(data, 5), i); + assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i); + assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2); + assert (RowUtil.getByte(data, 8).equals(new Byte("1"))); + assertEquals(RowUtil.getInt(data, 1), 17957); + assertEquals(RowUtil.getLong(data, 2), 1549920814000000L); + assert (RowUtil.getDecimal(data, 9).equals("12.35")); + assert (RowUtil.getString(data, 3).equals("varchar")); + assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i))); + assertEquals(RowUtil.getFloat(data, 11), new Float("1.23")); + i++; + } + reader.close(); --- End diff -- where? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230251542 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO } } + /** + * This method will list all the carbondata files in the table path and treat one carbondata + * file as one split. + */ + public List<InputSplit> getAllFileSplits(JobContext job) throws IOException { + List<InputSplit> splits = new ArrayList<>(); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) { + CarbonInputSplit split = + new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0, + carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3); + split.setVersion(ColumnarFormatVersion.V3); + BlockletDetailInfo info = new BlockletDetailInfo(); + split.setDetailInfo(info); + info.setBlockSize(carbonFile.getLength()); + // Read the footer offset and set. + FileReader reader = FileFactory --- End diff -- Reading of filefooter offset should not be inside getsplits as it will increase the getSplits time if files are more. it should be handled it during the record reader initialization time in executor side or inside the thread. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230251863 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the + * carbondata column APIs and fills the data directly into columns. + */ +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName()); + + private CarbonColumnarBatch carbonColumnarBatch; + + private QueryExecutor queryExecutor; + + private int batchIdx = 0; + + private int numBatched = 0; + + private AbstractDetailQueryResultIterator iterator; + + private QueryModel queryModel; + + public CarbonVectorizedRecordReader(QueryModel queryModel) { + this.queryModel = queryModel; + } + + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + List<CarbonInputSplit> splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + queryModel.setVectorReader(true); + try { + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); + iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); + } catch (QueryExecutionException e) { + LOGGER.error(e); + throw new InterruptedException(e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw e; + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + initBatch(); + if (batchIdx >= numBatched) { + if (!nextBatch()) return false; + } + ++batchIdx; + return true; + } + + + private boolean nextBatch() { + carbonColumnarBatch.reset(); + if (iterator.hasNext()) { + iterator.processNextBatch(carbonColumnarBatch); + numBatched = carbonColumnarBatch.getActualSize(); + batchIdx = 0; + return true; + } + return false; + } + + private void initBatch() { + if (carbonColumnarBatch == null) { + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); + StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; + for (ProjectionDimension dim : queryDimension) { + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); + } + for (ProjectionMeasure msr : queryMeasures) { + DataType dataType = msr.getMeasure().getDataType(); + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT + || dataType == DataTypes.INT || dataType == DataTypes.LONG + || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else if (DataTypes.isDecimal(dataType)) { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), + new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale())); + } else { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); + } + } + CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new CarbonColumnVectorImpl( + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + fields[i].getDataType()); + } + carbonColumnarBatch = new CarbonColumnarBatch(vectors, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + new boolean[] {}); + } + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + rowCount += 1; + Object[] row = new Object[carbonColumnarBatch.columnVectors.length]; + for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) { + if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING + || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) { + byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + row[i] = ByteUtil.toString(data, 0, data.length); + } else { + row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + } + } + return row; + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; --- End diff -- Better add an unsupported exception. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230252316 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the + * carbondata column APIs and fills the data directly into columns. + */ +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName()); + + private CarbonColumnarBatch carbonColumnarBatch; + + private QueryExecutor queryExecutor; + + private int batchIdx = 0; + + private int numBatched = 0; + + private AbstractDetailQueryResultIterator iterator; + + private QueryModel queryModel; + + public CarbonVectorizedRecordReader(QueryModel queryModel) { + this.queryModel = queryModel; + } + + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + List<CarbonInputSplit> splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + queryModel.setVectorReader(true); + try { + queryExecutor = + QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration()); + iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); + initBatch(); + } catch (QueryExecutionException e) { + LOGGER.error(e); + throw new InterruptedException(e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw e; + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (batchIdx >= numBatched) { + if (!nextBatch()) return false; + } + ++batchIdx; + return true; + } + + + private boolean nextBatch() { + carbonColumnarBatch.reset(); + if (iterator.hasNext()) { + iterator.processNextBatch(carbonColumnarBatch); + numBatched = carbonColumnarBatch.getActualSize(); + batchIdx = 0; + return true; + } + return false; + } + + private void initBatch() { + if (carbonColumnarBatch == null) { + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); + StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; + for (ProjectionDimension dim : queryDimension) { + fields[dim.getOrdinal()] = + new StructField(dim.getColumnName(), dim.getDimension().getDataType()); + } + for (ProjectionMeasure msr : queryMeasures) { + DataType dataType = msr.getMeasure().getDataType(); + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT + || dataType == DataTypes.INT || dataType == DataTypes.LONG + || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else if (DataTypes.isDecimal(dataType)) { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), + new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale())); + } else { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); + } + } + CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new CarbonColumnVectorImpl( + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + fields[i].getDataType()); + } + carbonColumnarBatch = new CarbonColumnarBatch(vectors, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, + new boolean[] {}); + } + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + rowCount += 1; + Object[] row = new Object[carbonColumnarBatch.columnVectors.length]; + for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) { + if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING + || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) { + byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + row[i] = ByteUtil.toString(data, 0, data.length); + } else { + row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1); + } + } + return row; + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { --- End diff -- Why do you need throw IOException, InterruptedException in here? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230252544 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO } } + /** + * This method will list all the carbondata files in the table path and treat one carbondata + * file as one split. + */ + public List<InputSplit> getAllFileSplits(JobContext job) throws IOException { --- End diff -- Don't add more public methods. Please take a conf property from outside and decide whether to store in datamap or just list splits plainly like this. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230256502 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java --- @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) { } try { - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - + List<InputSplit> splits; + if (filterExpression == null) { + splits = format.getAllFileSplits(job); + } else { + splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + } List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); for (InputSplit split : splits) { TaskAttemptContextImpl attempt = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); + RecordReader reader; + QueryModel queryModel = format.createQueryModel(split, attempt); + boolean hasComplex = false; + for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) { + if (projectionDimension.getDimension().isComplex()) { --- End diff -- Can you support Array<String>? because SDK/CSDK user need use this data type. --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230264659 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java --- @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() { assertEquals(RowUtil.getFloat(data, 11), (float) 1.23); i++; } + assert (i == 10); + reader.close(); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + + @Test + public void testVectorReader() { + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[12]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("shortField", DataTypes.SHORT); + fields[2] = new Field("intField", DataTypes.INT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + fields[9] = new Field("varcharField", DataTypes.VARCHAR); + fields[10] = new Field("byteField", DataTypes.BYTE); + fields[11] = new Field("floatField", DataTypes.FLOAT); + Map<String, String> map = new HashMap<>(); + map.put("complex_delimiter_level_1", "#"); + CarbonWriter writer = CarbonWriter.builder() + .outputPath(path) + .withLoadOptions(map) + .withCsvInput(new Schema(fields)) + .writtenBy("CarbonReaderTest") + .build(); + + for (int i = 0; i < 10; i++) { + String[] row2 = new String[]{ + "robot" + (i % 10), + String.valueOf(i % 10000), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34", + "12.345", + "varchar", + String.valueOf(i), + "1.23" + }; + writer.write(row2); + } + writer.close(); + + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .withVectorReader(true) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] data = (Object[]) reader.readNextRow(); + + assert (RowUtil.getString(data, 0).equals("robot" + i)); + assertEquals(RowUtil.getShort(data, 4), i); + assertEquals(RowUtil.getInt(data, 5), i); + assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i); + assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2); + assert (RowUtil.getByte(data, 8).equals(new Byte("1"))); --- End diff -- Can you support getBoolean? we shouldn't return byte for boolean directly --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230273015 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java --- @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) { } try { - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - + List<InputSplit> splits; + if (filterExpression == null) { + splits = format.getAllFileSplits(job); + } else { + splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + } List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); for (InputSplit split : splits) { TaskAttemptContextImpl attempt = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); + RecordReader reader; + QueryModel queryModel = format.createQueryModel(split, attempt); + boolean hasComplex = false; + for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) { + if (projectionDimension.getDimension().isComplex()) { --- End diff -- Vectorised Reader is not supported for complex types. When carbonsession support complex then we can enable for SDK alse. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230273109 --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java --- @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() { assertEquals(RowUtil.getFloat(data, 11), (float) 1.23); i++; } + assert (i == 10); + reader.close(); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + } + + @Test + public void testVectorReader() { + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[12]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("shortField", DataTypes.SHORT); + fields[2] = new Field("intField", DataTypes.INT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + fields[9] = new Field("varcharField", DataTypes.VARCHAR); + fields[10] = new Field("byteField", DataTypes.BYTE); + fields[11] = new Field("floatField", DataTypes.FLOAT); + Map<String, String> map = new HashMap<>(); + map.put("complex_delimiter_level_1", "#"); + CarbonWriter writer = CarbonWriter.builder() + .outputPath(path) + .withLoadOptions(map) + .withCsvInput(new Schema(fields)) + .writtenBy("CarbonReaderTest") + .build(); + + for (int i = 0; i < 10; i++) { + String[] row2 = new String[]{ + "robot" + (i % 10), + String.valueOf(i % 10000), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34", + "12.345", + "varchar", + String.valueOf(i), + "1.23" + }; + writer.write(row2); + } + writer.close(); + + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .withVectorReader(true) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] data = (Object[]) reader.readNextRow(); + + assert (RowUtil.getString(data, 0).equals("robot" + i)); + assertEquals(RowUtil.getShort(data, 4), i); + assertEquals(RowUtil.getInt(data, 5), i); + assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i); + assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2); + assert (RowUtil.getByte(data, 8).equals(new Byte("1"))); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230274806 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java --- @@ -51,6 +54,7 @@ private Expression filterExpression; private String tableName; private Configuration hadoopConf; + private boolean useVectorReader; --- End diff -- I suggest we can make vector as default as we see benefit in the performance. withVectorReader() can be a deprecated to support just for disable in case of issues till the time vector reader is stabilized. @ravipesala , is the fine? --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230277006 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java --- @@ -524,6 +524,22 @@ public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory. return getFiles(listStatus); } + @Override public List<CarbonFile> listFiles(Boolean recursive, CarbonFileFilter fileFilter) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230277015 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java --- @@ -347,9 +347,7 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, columnPage.getNullBits()); } else if (vectorDataType == DataTypes.FLOAT) { float[] floatPage = columnPage.getFloatPage(); - for (int i = 0; i < pageSize; i++) { - vector.putFloats(0, pageSize, floatPage, 0); - } + vector.putFloats(0, pageSize, floatPage, 0); --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230277069 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO } } + /** + * This method will list all the carbondata files in the table path and treat one carbondata + * file as one split. + */ + public List<InputSplit> getAllFileSplits(JobContext job) throws IOException { + List<InputSplit> splits = new ArrayList<>(); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) { + CarbonInputSplit split = + new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0, + carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3); + split.setVersion(ColumnarFormatVersion.V3); + BlockletDetailInfo info = new BlockletDetailInfo(); + split.setDetailInfo(info); + info.setBlockSize(carbonFile.getLength()); + // Read the footer offset and set. + FileReader reader = FileFactory --- End diff -- moved to CarbonVectorizedRecordReader.initialize() --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230277096 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO } } + /** + * This method will list all the carbondata files in the table path and treat one carbondata + * file as one split. + */ + public List<InputSplit> getAllFileSplits(JobContext job) throws IOException { --- End diff -- added a flag "filter_blocks" in jobConf --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1233/ --- |
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r230286721 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java --- @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) { } try { - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - + List<InputSplit> splits; + if (filterExpression == null) { + splits = format.getAllFileSplits(job); + } else { + splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + } List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); for (InputSplit split : splits) { TaskAttemptContextImpl attempt = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); + RecordReader reader; + QueryModel queryModel = format.createQueryModel(split, attempt); + boolean hasComplex = false; + for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) { + if (projectionDimension.getDimension().isComplex()) { --- End diff -- we should support it in this version or next version. design and implement --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2869 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1448/ --- |
Free forum by Nabble | Edit this page |