[ https://issues.apache.org/jira/browse/CARBONDATA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554261#comment-15554261 ] ASF GitHub Bot commented on CARBONDATA-279: ------------------------------------------- Github user QiangCai commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/203#discussion_r82334120 --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java --- @@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K } catch (NumberFormatException exc) { numberOfNodes = NUM_CORES_DEFAULT_VAL; } + if ( rddIteratorKey == null ) { + BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID()); + if (blocksInfo.length == 0) { + //if isDirectLoad = true, and partition number > file num + //then blocksInfo will get empty in some partition processing, so just return + setOutputDone(); + return false; + } - BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID()); - if (blocksInfo.length == 0) { - //if isDirectLoad = true, and partition number > file num - //then blocksInfo will get empty in some partition processing, so just return - setOutputDone(); - return false; - } - - if (numberOfNodes > blocksInfo.length) { - numberOfNodes = blocksInfo.length; - } + if (numberOfNodes > blocksInfo.length) { + numberOfNodes = blocksInfo.length; + } - //new the empty lists - for (int pos = 0; pos < numberOfNodes; pos++) { - threadBlockList.add(new ArrayList<BlockDetails>()); - } + //new the empty lists + for (int pos = 0; pos < numberOfNodes; pos++) { + threadBlockList.add(new ArrayList<BlockDetails>()); + } - //block balance to every thread - for (int pos = 0; pos < blocksInfo.length; ) { - for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) { - if (pos < blocksInfo.length) { - threadBlockList.get(threadNum).add(blocksInfo[pos++]); + //block balance to every thread + for (int pos = 0; pos < blocksInfo.length; ) { + for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) { + if (pos < blocksInfo.length) { + threadBlockList.get(threadNum).add(blocksInfo[pos++]); + } } } + LOGGER.info("*****************Started all csv reading***********"); + startProcess(numberOfNodes); + LOGGER.info("*****************Completed all csv reading***********"); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime( + meta.getPartitionID(), System.currentTimeMillis()); + } else { + scanRddIterator(); } - LOGGER.info("*****************Started all csv reading***********"); - startProcess(numberOfNodes); - LOGGER.info("*****************Completed all csv reading***********"); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime( - meta.getPartitionID(), System.currentTimeMillis()); setOutputDone(); return false; } + private void scanRddIterator() throws RuntimeException { + Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey); + if (iterator != null) { + try{ + while(iterator.hasNext()){ + putRow(data.outputRowMeta, iterator.next()); --- End diff -- I agree, we can merge kettle steps. > [DataLoading]Save a DataFrame to CarbonData file without writing CSV file > ------------------------------------------------------------------------- > > Key: CARBONDATA-279 > URL: https://issues.apache.org/jira/browse/CARBONDATA-279 > Project: CarbonData > Issue Type: Improvement > Affects Versions: 0.1.0-incubating > Reporter: QiangCai > Assignee: QiangCai > Priority: Minor > Fix For: 0.2.0-incubating > > > Directly save a DataFrame to CarbonData file without writing CSV file -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |