[GitHub] carbondata pull request #2374: WIP: Support csv based carbon table

classic Classic list List threaded Threaded
75 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: WIP: Support csv based carbon table

qiuchenjian-2
GitHub user xuchuanyin opened a pull request:

    https://github.com/apache/carbondata/pull/2374

    WIP: Support csv based carbon table

    This PR is only an initial implementation and has some restrictions.
   
    1. create csv based carbon table using
    ```SQL
    CREATE TABLE fact_table (
      col1 bigint, col2 string, ..., col100 string)
    STORED BY 'CarbonData'
    TBLPROPERTIES(
      'foramt'='csv',
      'csv.delimiter'=',',
      'csv.header'='col1,col2,col100');
    ```
    2. Load data to this table using
    ```SQL
    ALTER TABLE fact_table
    ADD SEGMENT LOCATION 'path/to/data1'
    ```
    **Note**: *In order to reduce data movement, we just mapping the origin csv to CarbonData segment using the following statement.*
   
    3. Query on this table has no difference from that on ordinary carbon table.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuchuanyin/carbondata 0613_support_csv_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2374
   
----
commit e9fa7e6402c7584146a52542534e719ca64143c1
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-13T01:03:28Z

    support create csv based carbontable

commit c71e9a5bda1ac23fa991a71e0e091f7814bd2117
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-13T01:45:10Z

    support add segment for csv carbon table

commit 01a8f00b5a50c50c6dd7854bd1d5500ac484b6e6
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-14T09:37:24Z

    Add csv record reader for csv carbon table

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6339/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5177/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5289/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6341/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5179/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: WIP: Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5291/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin closed the pull request at:

    https://github.com/apache/carbondata/pull/2374


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
GitHub user xuchuanyin reopened a pull request:

    https://github.com/apache/carbondata/pull/2374

    [CARBONDATA-2613] Support csv based carbon table

    This PR is only a basic implementation and has some restrictions. Now it can support create/load/directly query/drop on csv based carbon table.
   
    1. create csv based carbon table using
    ```SQL
    CREATE TABLE fact_table (
      col1 bigint, col2 string, ..., col100 string)
    STORED BY 'CarbonData'
    TBLPROPERTIES(
      'foramt'='csv',
      'csv.delimiter'=',',
      'csv.header'='col1,col2,col100');
    ```
    2. Load data to this table using
    ```SQL
    ALTER TABLE fact_table
    ADD SEGMENT LOCATION 'path/to/data1'
    ```
    **Note**: *In order to reduce data movement, we just mapping the origin csv to CarbonData segment using the following statement.*
   
    3. Query on this table has no difference from that on ordinary carbon table.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed?
     
     - [x] Any backward compatibility impacted?
     `NO`
     - [x] Document update required?
    `NO, will do it once the feature is released`
     - [x] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
    `basic tests added`
            - How it is tested? Please attach test report.
    `Tested in local machine`
            - Is it a performance related change? Please attach the performance test report.
    `NA`
            - Any additional information to help reviewers in testing this change.
           
     - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuchuanyin/carbondata 0613_support_csv_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2374
   
----
commit e9fa7e6402c7584146a52542534e719ca64143c1
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-13T01:03:28Z

    support create csv based carbontable

commit c71e9a5bda1ac23fa991a71e0e091f7814bd2117
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-13T01:45:10Z

    support add segment for csv carbon table

commit 01a8f00b5a50c50c6dd7854bd1d5500ac484b6e6
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-14T09:37:24Z

    Add csv record reader for csv carbon table

commit 506a072e8c7df64e89da8b53b3b5195fa4b01a31
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-14T15:00:35Z

    fix checkstyle

commit a9ff13027cf817c281de7030e36132f464abb3aa
Author: xuchuanyin <xuchuanyin@...>
Date:   2018-06-15T00:23:43Z

    support specifying csv properties

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195680457
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---
    @@ -89,6 +90,15 @@
        *
        */
       private boolean isTransactionalTable = true;
    +  /**
    +   * The format of the fact table.
    +   * By default it is carbondata, and we also support other format like CSV
    +   */
    +  private String format = "carbondata";
    --- End diff --
   
    This class should be backward compatible, please make sure you will process it if it is null
    And please add @Since("1.4.1") annotation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195684966
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---
    @@ -291,6 +318,17 @@ public void write(DataOutput out) throws IOException {
           }
         }
         out.writeBoolean(isSchemaModified);
    +
    +    out.writeUTF(format);
    +    boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0;
    +    out.writeBoolean(isFormatPropertiesExists);
    +    if (isFormatPropertiesExists) {
    +      out.writeShort(formatProperties.size());
    --- End diff --
   
    It is better to write `formateProperties` in one shot, performance is much better than writing each entry.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195686320
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java ---
    @@ -26,7 +26,10 @@
       COLUMNAR_V3,
     
       // carbondata row file format, optimized for write
    -  ROW_V1;
    +  ROW_V1,
    +
    +  // external file format, such as parquet/csv
    --- End diff --
   
    please describe where the format string is stored (table property in TableInfo)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195686824
 
    --- Diff: hadoop/pom.xml ---
    @@ -39,6 +39,11 @@
           <artifactId>carbondata-processing</artifactId>
           <version>${project.version}</version>
         </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-sql_${scala.binary.version}</artifactId>
    --- End diff --
   
    why this is required?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195687057
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonCsvRecordReader.java ---
    @@ -0,0 +1,663 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop;
    +
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.Reader;
    +import java.math.BigInteger;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
    +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.GenericQueryType;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.intf.RowImpl;
    +import org.apache.carbondata.core.scan.filter.intf.RowIntf;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.statusmanager.FileFormatProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +
    +import com.univocity.parsers.csv.CsvParser;
    +import com.univocity.parsers.csv.CsvParserSettings;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataTypes;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * scan csv file and filter on it
    + */
    +public class CarbonCsvRecordReader<T> extends AbstractRecordReader<T> {
    --- End diff --
   
    I think you can name it `CsvRecordReader`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195687377
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonCsvRecordReader.java ---
    @@ -0,0 +1,663 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop;
    +
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.Reader;
    +import java.math.BigInteger;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
    +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.GenericQueryType;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.intf.RowImpl;
    +import org.apache.carbondata.core.scan.filter.intf.RowIntf;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.statusmanager.FileFormatProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +
    +import com.univocity.parsers.csv.CsvParser;
    +import com.univocity.parsers.csv.CsvParserSettings;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataTypes;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * scan csv file and filter on it
    + */
    +public class CarbonCsvRecordReader<T> extends AbstractRecordReader<T> {
    +  private static final LogService LOGGER = LogServiceFactory.getLogService(
    +      CarbonCsvRecordReader.class.getName());
    +  private static final int MAX_BATCH_SIZE = 32000;
    +
    +  // vector reader
    +  private boolean isVectorReader;
    +  private ColumnarBatch columnarBatch;
    +  private StructType outputSchema;
    +
    +  // metadata
    +  private CarbonTable carbonTable;
    +  private CarbonColumn[] carbonColumns;
    +  // input
    +  private QueryModel queryModel;
    +  private FileSplit fileSplit;
    +  private Configuration hadoopConf;
    +  // the index is schema ordinal, the value is the csv ordinal
    +  private int[] schema2csvIdx;
    +
    +  // filter
    +  private FilterExecuter filter;
    +  // the index is the dimension ordinal, the value is the schema ordinal
    +  private int[] filterColumn2SchemaIdx;
    +  private Object[] internalValues;
    +  private RowIntf internalRow;
    +
    +  // output
    +  private CarbonColumn[] projection;
    +  // the index is the projection column ordinal, the value is the schema ordinal
    +  private int[] projectionColumn2SchemaIdx;
    +  private Object[] outputValues;
    +  private Object[] finalOutputValues;
    +  private InternalRow outputRow;
    +
    +  // inputMetricsStats
    +  private InputMetricsStats inputMetricsStats;
    +
    +  // scan
    +  private Reader reader;
    +  private CsvParser csvParser;
    +
    +  public CarbonCsvRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  public CarbonCsvRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
    +    this(queryModel);
    +    this.inputMetricsStats = inputMetricsStats;
    +  }
    +
    +  public boolean isVectorReader() {
    +    return isVectorReader;
    +  }
    +
    +  public void setVectorReader(boolean vectorReader) {
    +    isVectorReader = vectorReader;
    +  }
    +
    +  public void setQueryModel(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
    +    this.inputMetricsStats = inputMetricsStats;
    +  }
    +
    +  @Override
    +  public void initialize(InputSplit split, TaskAttemptContext context)
    +      throws IOException, InterruptedException {
    +    if (split instanceof CarbonInputSplit) {
    +      fileSplit = (CarbonInputSplit) split;
    +    } else if (split instanceof CarbonMultiBlockSplit) {
    +      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
    +    } else {
    +      fileSplit = (FileSplit) split;
    +    }
    +
    +    hadoopConf = context.getConfiguration();
    +    if (queryModel == null) {
    +      CarbonTableInputFormat inputFormat = new CarbonTableInputFormat<Object>();
    +      queryModel = inputFormat.createQueryModel(split, context);
    +    }
    +
    +    carbonTable = queryModel.getTable();
    +
    +    // since the sequence of csv header, schema, carbon internal row, projection are different,
    +    // we need to init the column mappings
    +    initializedIdxMapping();
    +
    +    // init filter
    +    if (null != queryModel.getFilterExpressionResolverTree()) {
    +      initializeFilter();
    +    }
    +
    +    // init reading
    +    initializeCsvReader();
    +  }
    +
    +  private void initializedIdxMapping() {
    +    carbonColumns =
    +        carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new CarbonColumn[0]);
    +    // for schema to csv mapping
    +    schema2csvIdx = new int[carbonColumns.length];
    +    if (!carbonTable.getTableInfo().getFormatProperties().containsKey(
    +        FileFormatProperties.CSV.HEADER)) {
    +      // if no header specified, it means that they are the same
    +      LOGGER.info("no header specified, will take the schema from table as header");
    +      for (int i = 0; i < carbonColumns.length; i++) {
    +        schema2csvIdx[i] = i;
    +      }
    +    } else {
    +      String[] csvHeader = carbonTable.getTableInfo().getFormatProperties().get(
    +          FileFormatProperties.CSV.HEADER).split(",");
    +      for (int i = 0; i < csvHeader.length; i++) {
    +        boolean found = false;
    +        for (int j = 0; j < carbonColumns.length; j++) {
    +          if (StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName())) {
    +            schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i;
    +            found = true;
    +            break;
    +          }
    +        }
    +        if (!found) {
    +          throw new RuntimeException(
    +              String.format("Can not find csv header '%s' in table fields", csvHeader[i]));
    +        }
    +      }
    +    }
    +
    +    // for carbon internal row to schema mapping
    +    filterColumn2SchemaIdx = new int[carbonColumns.length];
    +    int filterIdx = 0;
    +    for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +      filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal();
    +    }
    +    for (CarbonMeasure measure : carbonTable.getMeasures()) {
    +      filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal();
    +    }
    +
    +    // for projects to schema mapping
    +    projection = queryModel.getProjectionColumns();
    +    projectionColumn2SchemaIdx = new int[projection.length];
    +
    +    for (int i = 0; i < projection.length; i++) {
    +      for (int j = 0; j < carbonColumns.length; j++) {
    +        if (projection[i].getColName().equals(carbonColumns[j].getColName())) {
    +          projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal();
    +          break;
    +        }
    +      }
    +    }
    +  }
    +
    +  private void initializeFilter() {
    +    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
    +        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
    +            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
    +    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
    +    for (int i = 0; i < dimLensWithComplex.length; i++) {
    +      dimLensWithComplex[i] = Integer.MAX_VALUE;
    +    }
    +
    +    int[] dictionaryColumnCardinality =
    +        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
    +    SegmentProperties segmentProperties =
    +        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
    +    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
    +
    +    FilterResolverIntf resolverIntf = queryModel.getFilterExpressionResolverTree();
    +    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
    +        complexDimensionInfoMap);
    +    // for row filter, we need update column index
    +    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
    +        carbonTable.getDimensionOrdinalMax());
    +  }
    +
    +  private void initializeCsvReader() throws IOException {
    +    internalValues = new Object[carbonColumns.length];
    +    internalRow = new RowImpl();
    +    internalRow.setValues(internalValues);
    +
    +    outputValues = new Object[projection.length];
    +    finalOutputValues = new Object[projection.length];
    +    outputRow = new GenericInternalRow(outputValues);
    +
    +    Path file = fileSplit.getPath();
    +    FileSystem fs = file.getFileSystem(hadoopConf);
    +    int bufferSize = Integer.parseInt(
    +        hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, CSVInputFormat.READ_BUFFER_SIZE_DEFAULT));
    +    // note that here we read the whole file, not a split of the file
    +    FSDataInputStream fsStream = fs.open(file, bufferSize);
    +    reader = new InputStreamReader(fsStream, CarbonCommonConstants.DEFAULT_CHARSET);
    +    // use default csv settings first, then update it using user specified properties later
    +    CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(hadoopConf);
    +    initCsvSettings(settings);
    +    csvParser = new CsvParser(settings);
    +    csvParser.beginParsing(reader);
    +
    +    outputSchema = new StructType(convertCarbonColumnSpark(projection));
    +  }
    +
    +  /**
    +   * update the settings using properties from user
    +   */
    +  private void initCsvSettings(CsvParserSettings settings) {
    +    Map<String, String> csvProperties = carbonTable.getTableInfo().getFormatProperties();
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) {
    +      settings.getFormat().setDelimiter(
    +          csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) {
    +      settings.getFormat().setComment(
    +          csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) {
    +      settings.getFormat().setQuote(
    +          csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) {
    +      settings.getFormat().setQuoteEscape(
    +          csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) {
    +      settings.setSkipEmptyLines(
    +          Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE)));
    +    }
    +  }
    +
    +  private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) {
    +    StructField[] fields = new StructField[columns.length];
    +    for (int i = 0; i < columns.length; i++) {
    +      CarbonColumn carbonColumn = columns[i];
    +      fields[i] = new StructField(carbonColumn.getColName(),
    +          getSparkType4CarbonColumn(carbonColumn), true, null);
    +    }
    +    return fields;
    +  }
    +
    +  @Override
    +  public boolean nextKeyValue() throws IOException, InterruptedException {
    +    if (isVectorReader) {
    +      return nextColumnarBatch();
    +    }
    +
    +    return nextRow();
    +  }
    +
    +  private org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
    --- End diff --
   
    You can add ReadSupport for upper layer to convert the data type instead of adding it here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: [CARBONDATA-2613] Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6353/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: [CARBONDATA-2613] Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5191/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: [CARBONDATA-2613] Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5304/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r196307354
 
    --- Diff: common/src/main/java/org/apache/carbondata/common/annotations/Since.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.common.annotations;
    +
    +import java.lang.annotation.Documented;
    +import java.lang.annotation.ElementType;
    +import java.lang.annotation.Retention;
    +import java.lang.annotation.RetentionPolicy;
    +import java.lang.annotation.Target;
    +
    +/**
    + * The annotation indicates that the version number since a member or a type has been present.
    + */
    +@Documented
    +@Retention(RetentionPolicy.RUNTIME)
    +@Target({ElementType.FIELD, ElementType.TYPE})
    --- End diff --
   
    Add `ElementType.METHOD` also


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2374: [CARBONDATA-2613] Support csv based carbon table

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2374
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6365/



---
1234