[ https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15622411#comment-15622411 ] ASF GitHub Bot commented on CARBONDATA-2: ----------------------------------------- Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/263#discussion_r85757712 --- Diff: integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java --- @@ -213,6 +224,64 @@ public static void executeGraph(CarbonLoadModel loadModel, String storeLocation, info, loadModel.getPartitionId(), loadModel.getCarbonDataLoadSchema()); } + public static void executeNewDataLoad(CarbonLoadModel loadModel, String storeLocation, + String hdfsStoreLocation, RecordReader<NullWritable, StringArrayWritable>[] recordReaders) + throws Exception { + if (!new File(storeLocation).mkdirs()) { + LOGGER.error("Error while creating the temp store path: " + storeLocation); + } + CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration(); + String databaseName = loadModel.getDatabaseName(); + String tableName = loadModel.getTableName(); + String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName + + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo(); + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, hdfsStoreLocation); + // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc); + CarbonProperties.getInstance().addProperty("send.signal.load", "false"); + + CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + AbsoluteTableIdentifier identifier = + carbonTable.getAbsoluteTableIdentifier(); + configuration.setTableIdentifier(identifier); + String csvHeader = loadModel.getCsvHeader(); + if (csvHeader != null && !csvHeader.isEmpty()) { + configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, ",")); + } else { + CarbonFile csvFile = + CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0)); + configuration + .setHeader(CarbonDataProcessorUtil.getFileHeader(csvFile, loadModel.getCsvDelimiter())); + } + + configuration.setPartitionId(loadModel.getPartitionId()); + configuration.setSegmentId(loadModel.getSegmentId()); + configuration.setTaskNo(loadModel.getTaskNo()); + configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, + new String[] { loadModel.getComplexDelimiterLevel1(), + loadModel.getComplexDelimiterLevel2() }); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + DataField[] dataFields = new DataField[dimensions.size() + measures.size()]; + + int i = 0; + for (CarbonColumn column : dimensions) { + dataFields[i++] = new DataField(column); + } + for (CarbonColumn column : measures) { + dataFields[i++] = new DataField(column); + } + Iterator[] iterators = new RecordReaderIterator[recordReaders.length]; + configuration.setDataFields(dataFields); + for (int j = 0; j < recordReaders.length; j++) { + iterators[j] = new RecordReaderIterator(recordReaders[j]); + } + new DataLoadProcessExecutor().execute(configuration, iterators); --- End diff -- should have a CarbonTableOutputFormat and use it here, right? > Remove kettle for loading data > ------------------------------ > > Key: CARBONDATA-2 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2 > Project: CarbonData > Issue Type: Improvement > Reporter: Liang Chen > Priority: Critical > Fix For: 0.3.0-incubating > > Attachments: CarbonDataLoadingdesign.pdf > > > Remove kettle for loading data module -- This message was sent by Atlassian JIRA (v6.3.4#6332) |
Free forum by Nabble | Edit this page |