Posted by
Akash R Nilugal (Jira) on
Sep 28, 2016; 2:31pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/jira-Commented-CARBONDATA-279-DataLoading-Save-a-DataFrame-to-CarbonData-file-without-writing-CSV-fie-tp1548.html
[
https://issues.apache.org/jira/browse/CARBONDATA-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529824#comment-15529824 ]
ASF GitHub Bot commented on CARBONDATA-279:
-------------------------------------------
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/203#discussion_r80929523
--- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---
@@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
} catch (NumberFormatException exc) {
numberOfNodes = NUM_CORES_DEFAULT_VAL;
}
+ if ( rddIteratorKey == null ) {
+ BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
+ if (blocksInfo.length == 0) {
+ //if isDirectLoad = true, and partition number > file num
+ //then blocksInfo will get empty in some partition processing, so just return
+ setOutputDone();
+ return false;
+ }
- BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
- if (blocksInfo.length == 0) {
- //if isDirectLoad = true, and partition number > file num
- //then blocksInfo will get empty in some partition processing, so just return
- setOutputDone();
- return false;
- }
-
- if (numberOfNodes > blocksInfo.length) {
- numberOfNodes = blocksInfo.length;
- }
+ if (numberOfNodes > blocksInfo.length) {
+ numberOfNodes = blocksInfo.length;
+ }
- //new the empty lists
- for (int pos = 0; pos < numberOfNodes; pos++) {
- threadBlockList.add(new ArrayList<BlockDetails>());
- }
+ //new the empty lists
+ for (int pos = 0; pos < numberOfNodes; pos++) {
+ threadBlockList.add(new ArrayList<BlockDetails>());
+ }
- //block balance to every thread
- for (int pos = 0; pos < blocksInfo.length; ) {
- for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
- if (pos < blocksInfo.length) {
- threadBlockList.get(threadNum).add(blocksInfo[pos++]);
+ //block balance to every thread
+ for (int pos = 0; pos < blocksInfo.length; ) {
+ for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
+ if (pos < blocksInfo.length) {
+ threadBlockList.get(threadNum).add(blocksInfo[pos++]);
+ }
}
}
+ LOGGER.info("*****************Started all csv reading***********");
+ startProcess(numberOfNodes);
+ LOGGER.info("*****************Completed all csv reading***********");
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
+ meta.getPartitionID(), System.currentTimeMillis());
+ } else {
+ scanRddIterator();
}
- LOGGER.info("*****************Started all csv reading***********");
- startProcess(numberOfNodes);
- LOGGER.info("*****************Completed all csv reading***********");
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
- meta.getPartitionID(), System.currentTimeMillis());
setOutputDone();
return false;
}
+ private void scanRddIterator() throws RuntimeException {
+ Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
+ if (iterator != null) {
+ try{
+ while(iterator.hasNext()){
+ putRow(data.outputRowMeta, iterator.next());
--- End diff --
One suggestion:
Instead of calling putRow here, perhaps we can execute more steps here until reaching merge step which need to wait.
I suggest to raise another PR to implement this improvement.
> [DataLoading]Save a DataFrame to CarbonData file without writing CSV file
> -------------------------------------------------------------------------
>
> Key: CARBONDATA-279
> URL:
https://issues.apache.org/jira/browse/CARBONDATA-279> Project: CarbonData
> Issue Type: Improvement
> Affects Versions: 0.1.0-incubating
> Reporter: QiangCai
> Assignee: QiangCai
> Priority: Minor
> Fix For: 0.2.0-incubating
>
>
> Directly save a DataFrame to CarbonData file without writing CSV file
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)