[jira] [Commented] (CARBONDATA-298) 3. Add InputProcessorStep which should iterate recordreader and parse the data as per the data type.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (CARBONDATA-298) 3. Add InputProcessorStep which should iterate recordreader and parse the data as per the data type.

Akash R Nilugal (Jira)

    [ https://issues.apache.org/jira/browse/CARBONDATA-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591248#comment-15591248 ]

ASF GitHub Bot commented on CARBONDATA-298:
-------------------------------------------

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/240#discussion_r84231719
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---
    @@ -0,0 +1,171 @@
    +package org.apache.carbondata.processing.newflow.steps.input;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
    +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.newflow.DataField;
    +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
    +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
    +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
    +import org.apache.carbondata.processing.newflow.parser.GenericParser;
    +import org.apache.carbondata.processing.newflow.row.CarbonRow;
    +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
    +
    +/**
    + * It reads data from record reader and sends data to next step.
    + */
    +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
    +
    +  private GenericParser[] genericParsers;
    +
    +  private List<Iterator<Object[]>> inputIterators;
    +
    +  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
    +      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
    +    super(configuration, child);
    +    this.inputIterators = inputIterators;
    +  }
    +
    +  @Override public DataField[] getOutput() {
    +    DataField[] fields = configuration.getDataFields();
    +    String[] header = configuration.getHeader();
    +    DataField[] output = new DataField[fields.length];
    +    int k = 0;
    +    for (int i = 0; i < header.length; i++) {
    +      for (int j = 0; j < fields.length; j++) {
    +        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
    +          output[k++] = fields[j];
    +          break;
    +        }
    +      }
    +    }
    +    return output;
    +  }
    +
    +  @Override public void initialize() throws CarbonDataLoadingException {
    +    DataField[] output = getOutput();
    +    genericParsers = new GenericParser[output.length];
    +    for (int i = 0; i < genericParsers.length; i++) {
    +      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
    +          (String[]) configuration
    +              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
    +    }
    +  }
    +
    +  private int getNumberOfCores() {
    --- End diff --
   
    I think these functions can be shared across the project, so considering move them into CarbonProperties directly, like `CarbonProperties.numberOfCores()` and `CarbonProperties.batchSize()`


> 3. Add InputProcessorStep which should iterate recordreader and parse the data as per the data type.
> ----------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-298
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-298
>             Project: CarbonData
>          Issue Type: Sub-task
>            Reporter: Ravindra Pesala
>             Fix For: 0.2.0-incubating
>
>
> Add InputProcessorStep which should iterate recordreader/RecordBufferedWriter and parse the data as per the data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)