[ https://issues.apache.org/jira/browse/CARBONDATA-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591236#comment-15591236 ] ASF GitHub Bot commented on CARBONDATA-298: ------------------------------------------- Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/240#discussion_r84232604 --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java --- @@ -0,0 +1,171 @@ +package org.apache.carbondata.processing.newflow.steps.input; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory; +import org.apache.carbondata.processing.newflow.parser.GenericParser; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; + +/** + * It reads data from record reader and sends data to next step. + */ +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName()); + + private GenericParser[] genericParsers; + + private List<Iterator<Object[]>> inputIterators; + + public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) { + super(configuration, child); + this.inputIterators = inputIterators; + } + + @Override public DataField[] getOutput() { + DataField[] fields = configuration.getDataFields(); + String[] header = configuration.getHeader(); + DataField[] output = new DataField[fields.length]; + int k = 0; + for (int i = 0; i < header.length; i++) { + for (int j = 0; j < fields.length; j++) { + if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) { + output[k++] = fields[j]; + break; + } + } + } + return output; + } + + @Override public void initialize() throws CarbonDataLoadingException { + DataField[] output = getOutput(); + genericParsers = new GenericParser[output.length]; + for (int i = 0; i < genericParsers.length; i++) { + genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(), + (String[]) configuration + .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS)); + } + } + + private int getNumberOfCores() { + int numberOfCores; + try { + numberOfCores = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); + } catch (NumberFormatException exc) { + numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + } + return numberOfCores; + } + + private int getBatchSize() { + int batchSize; + try { + batchSize = Integer.parseInt(configuration + .getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE, + DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString()); + } catch (NumberFormatException exc) { + batchSize = Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT); + } + return batchSize; + } + + @Override public Iterator<CarbonRowBatch>[] execute() { + int batchSize = getBatchSize(); + List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; + for (int i = 0; i < outIterators.length; i++) { + outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize); + } + return outIterators; + } + + private List<Iterator<Object[]>>[] partitionInputReaderIterators() { + int numberOfCores = getNumberOfCores(); + if (inputIterators.size() < numberOfCores) { + numberOfCores = inputIterators.size(); --- End diff -- please use a different name (suggest parallelism), the meaning of this variable is `min(inputIterator.size(), numberOfCores)` > 3. Add InputProcessorStep which should iterate recordreader and parse the data as per the data type. > ---------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-298 > URL: https://issues.apache.org/jira/browse/CARBONDATA-298 > Project: CarbonData > Issue Type: Sub-task > Reporter: Ravindra Pesala > Fix For: 0.2.0-incubating > > > Add InputProcessorStep which should iterate recordreader/RecordBufferedWriter and parse the data as per the data types. -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |