[GitHub] carbondata pull request #2869: [WIP] Changes for improving carbon reader per...

classic Classic list List threaded Threaded
116 messages Options
123456
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r229918994
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---
    @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() {
             assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
             i++;
           }
    +      assert  (i == 10);
    +      reader.close();
    +    } catch (Throwable e) {
    +      e.printStackTrace();
    +      Assert.fail(e.getMessage());
    +    } finally {
    +      try {
    +        FileUtils.deleteDirectory(new File(path));
    +      } catch (IOException e) {
    +        e.printStackTrace();
    +        Assert.fail(e.getMessage());
    +      }
    +    }
    +  }
    +
    +  @Test
    +  public void testVectorReader() {
    +    String path = "./testWriteFiles";
    +    try {
    +      FileUtils.deleteDirectory(new File(path));
    +
    +      Field[] fields = new Field[12];
    +      fields[0] = new Field("stringField", DataTypes.STRING);
    +      fields[1] = new Field("shortField", DataTypes.SHORT);
    +      fields[2] = new Field("intField", DataTypes.INT);
    +      fields[3] = new Field("longField", DataTypes.LONG);
    +      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
    +      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
    +      fields[6] = new Field("dateField", DataTypes.DATE);
    +      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
    +      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
    +      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
    +      fields[10] = new Field("byteField", DataTypes.BYTE);
    +      fields[11] = new Field("floatField", DataTypes.FLOAT);
    +      Map<String, String> map = new HashMap<>();
    +      map.put("complex_delimiter_level_1", "#");
    +      CarbonWriter writer = CarbonWriter.builder()
    +          .outputPath(path)
    +          .withLoadOptions(map)
    +          .withCsvInput(new Schema(fields))
    +          .writtenBy("CarbonReaderTest")
    +          .build();
    +
    +      for (int i = 0; i < 10; i++) {
    +        String[] row2 = new String[]{
    +            "robot" + (i % 10),
    +            String.valueOf(i % 10000),
    +            String.valueOf(i),
    +            String.valueOf(Long.MAX_VALUE - i),
    +            String.valueOf((double) i / 2),
    +            String.valueOf(true),
    +            "2019-03-02",
    +            "2019-02-12 03:03:34",
    +            "12.345",
    +            "varchar",
    +            String.valueOf(i),
    +            "1.23"
    +        };
    +        writer.write(row2);
    +      }
    +      writer.close();
    +
    +      // Read data
    +      CarbonReader reader = CarbonReader
    +          .builder(path, "_temp")
    +          .withVectorReader(true)
    +          .build();
    +
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] data = (Object[]) reader.readNextRow();
    +
    +        assert (RowUtil.getString(data, 0).equals("robot" + i));
    +        assertEquals(RowUtil.getShort(data, 4), i);
    +        assertEquals(RowUtil.getInt(data, 5), i);
    +        assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
    +        assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
    +        assert (RowUtil.getByte(data, 8).equals(new Byte("1")));
    +        assertEquals(RowUtil.getInt(data, 1), 17957);
    +        assertEquals(RowUtil.getLong(data, 2), 1549920814000000L);
    --- End diff --
   
    timestamp value is different between local machine and CI machine.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230247332
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---
    @@ -524,6 +524,22 @@ public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.
         return getFiles(listStatus);
       }
     
    +  @Override public List<CarbonFile> listFiles(Boolean recursive, CarbonFileFilter fileFilter)
    --- End diff --
   
    Move ` @Override` down and add doc for public method


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230247574
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---
    @@ -347,9 +347,7 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
                 columnPage.getNullBits());
           } else if (vectorDataType == DataTypes.FLOAT) {
             float[] floatPage = columnPage.getFloatPage();
    -        for (int i = 0; i < pageSize; i++) {
    -          vector.putFloats(0, pageSize, floatPage, 0);
    -        }
    +        vector.putFloats(0, pageSize, floatPage, 0);
    --- End diff --
   
    Line numbers from 322 to 334 also remove as it is duplicated.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230251185
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---
    @@ -1737,4 +1738,89 @@ public void testReadNextRowWithProjectionAndRowUtil() {
         }
       }
     
    +  @Test
    +  public void testVectorReader() {
    +    String path = "./testWriteFiles";
    +    try {
    +      FileUtils.deleteDirectory(new File(path));
    +
    +      Field[] fields = new Field[12];
    +      fields[0] = new Field("stringField", DataTypes.STRING);
    +      fields[1] = new Field("shortField", DataTypes.SHORT);
    +      fields[2] = new Field("intField", DataTypes.INT);
    +      fields[3] = new Field("longField", DataTypes.LONG);
    +      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
    +      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
    +      fields[6] = new Field("dateField", DataTypes.DATE);
    +      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
    +      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
    +      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
    +      fields[10] = new Field("byteField", DataTypes.BYTE);
    +      fields[11] = new Field("floatField", DataTypes.FLOAT);
    +      Map<String, String> map = new HashMap<>();
    +      map.put("complex_delimiter_level_1", "#");
    +      CarbonWriter writer = CarbonWriter.builder()
    +          .outputPath(path)
    +          .withLoadOptions(map)
    +          .withCsvInput(new Schema(fields))
    +          .writtenBy("CarbonReaderTest")
    +          .build();
    +
    +      for (int i = 0; i < 10; i++) {
    +        String[] row2 = new String[]{
    +            "robot" + (i % 10),
    +            String.valueOf(i % 10000),
    +            String.valueOf(i),
    +            String.valueOf(Long.MAX_VALUE - i),
    +            String.valueOf((double) i / 2),
    +            String.valueOf(true),
    +            "2019-03-02",
    +            "2019-02-12 03:03:34",
    +            "12.345",
    +            "varchar",
    +            String.valueOf(i),
    +            "1.23"
    +        };
    +        writer.write(row2);
    +      }
    +      writer.close();
    +
    +      // Read data
    +      CarbonReader reader = CarbonReader
    +          .builder(path, "_temp")
    +          .withVectorReader(true)
    +          .build();
    +
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] data = (Object[]) reader.readNextRow();
    +
    +        assert (RowUtil.getString(data, 0).equals("robot" + i));
    +        assertEquals(RowUtil.getShort(data, 4), i);
    +        assertEquals(RowUtil.getInt(data, 5), i);
    +        assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
    +        assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
    +        assert (RowUtil.getByte(data, 8).equals(new Byte("1")));
    +        assertEquals(RowUtil.getInt(data, 1), 17957);
    +        assertEquals(RowUtil.getLong(data, 2), 1549920814000000L);
    +        assert (RowUtil.getDecimal(data, 9).equals("12.35"));
    +        assert (RowUtil.getString(data, 3).equals("varchar"));
    +        assertEquals(RowUtil.getByte(data, 10), new Byte(String.valueOf(i)));
    +        assertEquals(RowUtil.getFloat(data, 11), new Float("1.23"));
    +        i++;
    +      }
    +      reader.close();
    --- End diff --
   
    where?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230251542
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---
    @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         }
       }
     
    +  /**
    +   * This method will list all the carbondata files in the table path and treat one carbondata
    +   * file as one split.
    +   */
    +  public List<InputSplit> getAllFileSplits(JobContext job) throws IOException {
    +    List<InputSplit> splits = new ArrayList<>();
    +    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
    +    if (null == carbonTable) {
    +      throw new IOException("Missing/Corrupt schema file for table.");
    +    }
    +    for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
    +      CarbonInputSplit split =
    +          new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0,
    +              carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
    +      split.setVersion(ColumnarFormatVersion.V3);
    +      BlockletDetailInfo info = new BlockletDetailInfo();
    +      split.setDetailInfo(info);
    +      info.setBlockSize(carbonFile.getLength());
    +      // Read the footer offset and set.
    +      FileReader reader = FileFactory
    --- End diff --
   
    Reading of filefooter offset should not be inside getsplits as it will increase the getSplits time if files are more. it should be handled it during the record reader initialization time in executor side or inside the thread.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230251863
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.DecimalType;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.scan.executor.QueryExecutor;
    +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
    +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.core.scan.model.ProjectionDimension;
    +import org.apache.carbondata.core.scan.model.ProjectionMeasure;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
    +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
    +import org.apache.carbondata.core.util.ByteUtil;
    +import org.apache.carbondata.hadoop.AbstractRecordReader;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.log4j.Logger;
    +
    +/**
    + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the
    + * carbondata column APIs and fills the data directly into columns.
    + */
    +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
    +
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
    +
    +  private CarbonColumnarBatch carbonColumnarBatch;
    +
    +  private QueryExecutor queryExecutor;
    +
    +  private int batchIdx = 0;
    +
    +  private int numBatched = 0;
    +
    +  private AbstractDetailQueryResultIterator iterator;
    +
    +  private QueryModel queryModel;
    +
    +  public CarbonVectorizedRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
    +      throws IOException, InterruptedException {
    +    List<CarbonInputSplit> splitList;
    +    if (inputSplit instanceof CarbonInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonInputSplit) inputSplit);
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    queryModel.setVectorReader(true);
    +    try {
    +      queryExecutor =
    +          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
    +      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
    +    } catch (QueryExecutionException e) {
    +      LOGGER.error(e);
    +      throw new InterruptedException(e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e);
    +      throw e;
    +    }
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +    initBatch();
    +    if (batchIdx >= numBatched) {
    +      if (!nextBatch()) return false;
    +    }
    +    ++batchIdx;
    +    return true;
    +  }
    +
    +
    +  private boolean nextBatch() {
    +    carbonColumnarBatch.reset();
    +    if (iterator.hasNext()) {
    +      iterator.processNextBatch(carbonColumnarBatch);
    +      numBatched = carbonColumnarBatch.getActualSize();
    +      batchIdx = 0;
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  private void initBatch() {
    +    if (carbonColumnarBatch == null) {
    +      List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
    +      List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
    +      StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
    +      for (ProjectionDimension dim : queryDimension) {
    +        fields[dim.getOrdinal()] =
    +            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
    +      }
    +      for (ProjectionMeasure msr : queryMeasures) {
    +        DataType dataType = msr.getMeasure().getDataType();
    +        if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
    +            || dataType == DataTypes.INT || dataType == DataTypes.LONG
    +            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
    +          fields[msr.getOrdinal()] =
    +              new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
    +        } else if (DataTypes.isDecimal(dataType)) {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
    +              new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()));
    +        } else {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
    +        }
    +      }
    +      CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
    +      for (int i = 0; i < fields.length; i++) {
    +        vectors[i] = new CarbonColumnVectorImpl(
    +            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +            fields[i].getDataType());
    +      }
    +      carbonColumnarBatch = new CarbonColumnarBatch(vectors,
    +          CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +          new boolean[] {});
    +    }
    +  }
    +
    +  @Override
    +  public Object getCurrentValue() throws IOException, InterruptedException {
    +    rowCount += 1;
    +    Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
    +    for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
    +      if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
    +          || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
    +        byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
    +        row[i] = ByteUtil.toString(data, 0, data.length);
    +      } else {
    +        row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
    +      }
    +    }
    +    return row;
    +  }
    +
    +  @Override public Void getCurrentKey() throws IOException, InterruptedException {
    +    return null;
    --- End diff --
   
    Better add an unsupported exception.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230252316
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.DecimalType;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.scan.executor.QueryExecutor;
    +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
    +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.core.scan.model.ProjectionDimension;
    +import org.apache.carbondata.core.scan.model.ProjectionMeasure;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
    +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
    +import org.apache.carbondata.core.util.ByteUtil;
    +import org.apache.carbondata.hadoop.AbstractRecordReader;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.log4j.Logger;
    +
    +/**
    + * A specialized RecordReader that reads into CarbonColumnarBatches directly using the
    + * carbondata column APIs and fills the data directly into columns.
    + */
    +public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
    +
    +  private static final Logger LOGGER =
    +      LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
    +
    +  private CarbonColumnarBatch carbonColumnarBatch;
    +
    +  private QueryExecutor queryExecutor;
    +
    +  private int batchIdx = 0;
    +
    +  private int numBatched = 0;
    +
    +  private AbstractDetailQueryResultIterator iterator;
    +
    +  private QueryModel queryModel;
    +
    +  public CarbonVectorizedRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
    +      throws IOException, InterruptedException {
    +    List<CarbonInputSplit> splitList;
    +    if (inputSplit instanceof CarbonInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonInputSplit) inputSplit);
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    queryModel.setVectorReader(true);
    +    try {
    +      queryExecutor =
    +          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
    +      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
    +      initBatch();
    +    } catch (QueryExecutionException e) {
    +      LOGGER.error(e);
    +      throw new InterruptedException(e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e);
    +      throw e;
    +    }
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +    if (batchIdx >= numBatched) {
    +      if (!nextBatch()) return false;
    +    }
    +    ++batchIdx;
    +    return true;
    +  }
    +
    +
    +  private boolean nextBatch() {
    +    carbonColumnarBatch.reset();
    +    if (iterator.hasNext()) {
    +      iterator.processNextBatch(carbonColumnarBatch);
    +      numBatched = carbonColumnarBatch.getActualSize();
    +      batchIdx = 0;
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  private void initBatch() {
    +    if (carbonColumnarBatch == null) {
    +      List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
    +      List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
    +      StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
    +      for (ProjectionDimension dim : queryDimension) {
    +        fields[dim.getOrdinal()] =
    +            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
    +      }
    +      for (ProjectionMeasure msr : queryMeasures) {
    +        DataType dataType = msr.getMeasure().getDataType();
    +        if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
    +            || dataType == DataTypes.INT || dataType == DataTypes.LONG
    +            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
    +          fields[msr.getOrdinal()] =
    +              new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
    +        } else if (DataTypes.isDecimal(dataType)) {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
    +              new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()));
    +        } else {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
    +        }
    +      }
    +      CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
    +      for (int i = 0; i < fields.length; i++) {
    +        vectors[i] = new CarbonColumnVectorImpl(
    +            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +            fields[i].getDataType());
    +      }
    +      carbonColumnarBatch = new CarbonColumnarBatch(vectors,
    +          CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +          new boolean[] {});
    +    }
    +  }
    +
    +  @Override
    +  public Object getCurrentValue() throws IOException, InterruptedException {
    +    rowCount += 1;
    +    Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
    +    for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
    +      if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
    +          || carbonColumnarBatch.columnVectors[i].getType() == DataTypes.VARCHAR) {
    +        byte[] data = (byte[]) carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
    +        row[i] = ByteUtil.toString(data, 0, data.length);
    +      } else {
    +        row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
    +      }
    +    }
    +    return row;
    +  }
    +
    +  @Override public Void getCurrentKey() throws IOException, InterruptedException {
    --- End diff --
   
    Why do you need throw IOException, InterruptedException in here?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230252544
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---
    @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         }
       }
     
    +  /**
    +   * This method will list all the carbondata files in the table path and treat one carbondata
    +   * file as one split.
    +   */
    +  public List<InputSplit> getAllFileSplits(JobContext job) throws IOException {
    --- End diff --
   
    Don't add more public methods. Please take a conf property from outside and decide whether to store in datamap or just list splits plainly like this.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230256502
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---
    @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) {
         }
     
         try {
    -      final List<InputSplit> splits =
    -          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    -
    +      List<InputSplit> splits;
    +      if (filterExpression == null) {
    +        splits = format.getAllFileSplits(job);
    +      } else {
    +        splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    +      }
           List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
           for (InputSplit split : splits) {
             TaskAttemptContextImpl attempt =
                 new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
    -        RecordReader reader = format.createRecordReader(split, attempt);
    +        RecordReader reader;
    +        QueryModel queryModel = format.createQueryModel(split, attempt);
    +        boolean hasComplex = false;
    +        for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
    +          if (projectionDimension.getDimension().isComplex()) {
    --- End diff --
   
    Can you support Array<String>? because SDK/CSDK user need use this data type.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230264659
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---
    @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() {
             assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
             i++;
           }
    +      assert  (i == 10);
    +      reader.close();
    +    } catch (Throwable e) {
    +      e.printStackTrace();
    +      Assert.fail(e.getMessage());
    +    } finally {
    +      try {
    +        FileUtils.deleteDirectory(new File(path));
    +      } catch (IOException e) {
    +        e.printStackTrace();
    +        Assert.fail(e.getMessage());
    +      }
    +    }
    +  }
    +
    +  @Test
    +  public void testVectorReader() {
    +    String path = "./testWriteFiles";
    +    try {
    +      FileUtils.deleteDirectory(new File(path));
    +
    +      Field[] fields = new Field[12];
    +      fields[0] = new Field("stringField", DataTypes.STRING);
    +      fields[1] = new Field("shortField", DataTypes.SHORT);
    +      fields[2] = new Field("intField", DataTypes.INT);
    +      fields[3] = new Field("longField", DataTypes.LONG);
    +      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
    +      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
    +      fields[6] = new Field("dateField", DataTypes.DATE);
    +      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
    +      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
    +      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
    +      fields[10] = new Field("byteField", DataTypes.BYTE);
    +      fields[11] = new Field("floatField", DataTypes.FLOAT);
    +      Map<String, String> map = new HashMap<>();
    +      map.put("complex_delimiter_level_1", "#");
    +      CarbonWriter writer = CarbonWriter.builder()
    +          .outputPath(path)
    +          .withLoadOptions(map)
    +          .withCsvInput(new Schema(fields))
    +          .writtenBy("CarbonReaderTest")
    +          .build();
    +
    +      for (int i = 0; i < 10; i++) {
    +        String[] row2 = new String[]{
    +            "robot" + (i % 10),
    +            String.valueOf(i % 10000),
    +            String.valueOf(i),
    +            String.valueOf(Long.MAX_VALUE - i),
    +            String.valueOf((double) i / 2),
    +            String.valueOf(true),
    +            "2019-03-02",
    +            "2019-02-12 03:03:34",
    +            "12.345",
    +            "varchar",
    +            String.valueOf(i),
    +            "1.23"
    +        };
    +        writer.write(row2);
    +      }
    +      writer.close();
    +
    +      // Read data
    +      CarbonReader reader = CarbonReader
    +          .builder(path, "_temp")
    +          .withVectorReader(true)
    +          .build();
    +
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] data = (Object[]) reader.readNextRow();
    +
    +        assert (RowUtil.getString(data, 0).equals("robot" + i));
    +        assertEquals(RowUtil.getShort(data, 4), i);
    +        assertEquals(RowUtil.getInt(data, 5), i);
    +        assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
    +        assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
    +        assert (RowUtil.getByte(data, 8).equals(new Byte("1")));
    --- End diff --
   
    Can you support getBoolean? we shouldn't return byte for boolean directly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230273015
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---
    @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) {
         }
     
         try {
    -      final List<InputSplit> splits =
    -          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    -
    +      List<InputSplit> splits;
    +      if (filterExpression == null) {
    +        splits = format.getAllFileSplits(job);
    +      } else {
    +        splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    +      }
           List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
           for (InputSplit split : splits) {
             TaskAttemptContextImpl attempt =
                 new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
    -        RecordReader reader = format.createRecordReader(split, attempt);
    +        RecordReader reader;
    +        QueryModel queryModel = format.createQueryModel(split, attempt);
    +        boolean hasComplex = false;
    +        for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
    +          if (projectionDimension.getDimension().isComplex()) {
    --- End diff --
   
    Vectorised Reader is not supported for complex types. When carbonsession support complex then we can enable for SDK alse.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230273109
 
    --- Diff: store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---
    @@ -1723,6 +1724,92 @@ public void testReadNextRowWithProjectionAndRowUtil() {
             assertEquals(RowUtil.getFloat(data, 11), (float) 1.23);
             i++;
           }
    +      assert  (i == 10);
    +      reader.close();
    +    } catch (Throwable e) {
    +      e.printStackTrace();
    +      Assert.fail(e.getMessage());
    +    } finally {
    +      try {
    +        FileUtils.deleteDirectory(new File(path));
    +      } catch (IOException e) {
    +        e.printStackTrace();
    +        Assert.fail(e.getMessage());
    +      }
    +    }
    +  }
    +
    +  @Test
    +  public void testVectorReader() {
    +    String path = "./testWriteFiles";
    +    try {
    +      FileUtils.deleteDirectory(new File(path));
    +
    +      Field[] fields = new Field[12];
    +      fields[0] = new Field("stringField", DataTypes.STRING);
    +      fields[1] = new Field("shortField", DataTypes.SHORT);
    +      fields[2] = new Field("intField", DataTypes.INT);
    +      fields[3] = new Field("longField", DataTypes.LONG);
    +      fields[4] = new Field("doubleField", DataTypes.DOUBLE);
    +      fields[5] = new Field("boolField", DataTypes.BOOLEAN);
    +      fields[6] = new Field("dateField", DataTypes.DATE);
    +      fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
    +      fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
    +      fields[9] = new Field("varcharField", DataTypes.VARCHAR);
    +      fields[10] = new Field("byteField", DataTypes.BYTE);
    +      fields[11] = new Field("floatField", DataTypes.FLOAT);
    +      Map<String, String> map = new HashMap<>();
    +      map.put("complex_delimiter_level_1", "#");
    +      CarbonWriter writer = CarbonWriter.builder()
    +          .outputPath(path)
    +          .withLoadOptions(map)
    +          .withCsvInput(new Schema(fields))
    +          .writtenBy("CarbonReaderTest")
    +          .build();
    +
    +      for (int i = 0; i < 10; i++) {
    +        String[] row2 = new String[]{
    +            "robot" + (i % 10),
    +            String.valueOf(i % 10000),
    +            String.valueOf(i),
    +            String.valueOf(Long.MAX_VALUE - i),
    +            String.valueOf((double) i / 2),
    +            String.valueOf(true),
    +            "2019-03-02",
    +            "2019-02-12 03:03:34",
    +            "12.345",
    +            "varchar",
    +            String.valueOf(i),
    +            "1.23"
    +        };
    +        writer.write(row2);
    +      }
    +      writer.close();
    +
    +      // Read data
    +      CarbonReader reader = CarbonReader
    +          .builder(path, "_temp")
    +          .withVectorReader(true)
    +          .build();
    +
    +      int i = 0;
    +      while (reader.hasNext()) {
    +        Object[] data = (Object[]) reader.readNextRow();
    +
    +        assert (RowUtil.getString(data, 0).equals("robot" + i));
    +        assertEquals(RowUtil.getShort(data, 4), i);
    +        assertEquals(RowUtil.getInt(data, 5), i);
    +        assert (RowUtil.getLong(data, 6) == Long.MAX_VALUE - i);
    +        assertEquals(RowUtil.getDouble(data, 7), ((double) i) / 2);
    +        assert (RowUtil.getByte(data, 8).equals(new Byte("1")));
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230274806
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---
    @@ -51,6 +54,7 @@
       private Expression filterExpression;
       private String tableName;
       private Configuration hadoopConf;
    +  private boolean useVectorReader;
    --- End diff --
   
    I suggest we can make vector as default as we see benefit in the performance.  
    withVectorReader() can be a deprecated to support just for disable  in case of issues till the time vector reader is stabilized. @ravipesala , is the fine?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230277006
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---
    @@ -524,6 +524,22 @@ public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.
         return getFiles(listStatus);
       }
     
    +  @Override public List<CarbonFile> listFiles(Boolean recursive, CarbonFileFilter fileFilter)
    --- End diff --
   
    ok



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230277015
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---
    @@ -347,9 +347,7 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
                 columnPage.getNullBits());
           } else if (vectorDataType == DataTypes.FLOAT) {
             float[] floatPage = columnPage.getFloatPage();
    -        for (int i = 0; i < pageSize; i++) {
    -          vector.putFloats(0, pageSize, floatPage, 0);
    -        }
    +        vector.putFloats(0, pageSize, floatPage, 0);
    --- End diff --
   
    removed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230277069
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---
    @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         }
       }
     
    +  /**
    +   * This method will list all the carbondata files in the table path and treat one carbondata
    +   * file as one split.
    +   */
    +  public List<InputSplit> getAllFileSplits(JobContext job) throws IOException {
    +    List<InputSplit> splits = new ArrayList<>();
    +    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
    +    if (null == carbonTable) {
    +      throw new IOException("Missing/Corrupt schema file for table.");
    +    }
    +    for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
    +      CarbonInputSplit split =
    +          new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0,
    +              carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
    +      split.setVersion(ColumnarFormatVersion.V3);
    +      BlockletDetailInfo info = new BlockletDetailInfo();
    +      split.setDetailInfo(info);
    +      info.setBlockSize(carbonFile.getLength());
    +      // Read the footer offset and set.
    +      FileReader reader = FileFactory
    --- End diff --
   
    moved to CarbonVectorizedRecordReader.initialize()


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230277096
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---
    @@ -88,6 +99,50 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
         }
       }
     
    +  /**
    +   * This method will list all the carbondata files in the table path and treat one carbondata
    +   * file as one split.
    +   */
    +  public List<InputSplit> getAllFileSplits(JobContext job) throws IOException {
    --- End diff --
   
    added a flag "filter_blocks" in jobConf


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2869: [CARBONDATA-3057] Implement VectorizedReader for SDK...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2869
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1233/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2869: [CARBONDATA-3057] Implement VectorizedReader ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xubo245 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r230286721
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---
    @@ -158,14 +173,31 @@ public CarbonReaderBuilder withHadoopConf(String key, String value) {
         }
     
         try {
    -      final List<InputSplit> splits =
    -          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    -
    +      List<InputSplit> splits;
    +      if (filterExpression == null) {
    +        splits = format.getAllFileSplits(job);
    +      } else {
    +        splits = format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
    +      }
           List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
           for (InputSplit split : splits) {
             TaskAttemptContextImpl attempt =
                 new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
    -        RecordReader reader = format.createRecordReader(split, attempt);
    +        RecordReader reader;
    +        QueryModel queryModel = format.createQueryModel(split, attempt);
    +        boolean hasComplex = false;
    +        for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
    +          if (projectionDimension.getDimension().isComplex()) {
    --- End diff --
   
    we should support it in this version or next version. design and implement


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2869: [CARBONDATA-3057] Implement VectorizedReader for SDK...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2869
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1448/



---
123456