Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157332993 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333001 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333012 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Update the tablestatus as fail if any fail happens. + * + * @param context + * @param state + * @throws IOException + */ + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { + super.abortJob(context, state); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel); + } catch (InterruptedException e) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333060 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Update the tablestatus as fail if any fail happens. + * + * @param context + * @param state + * @throws IOException + */ + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333109 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333123 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333237 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); + final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); + final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); + CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor); + new Thread() { + @Override public void run() { + try { + dataLoadExecutor + .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + } catch (Exception e) { + dataLoadExecutor.close(); + throw new RuntimeException(e); + } + } + }.start(); + + return recordWriter; + } + + public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException { + CarbonLoadModel model; --- End diff -- But it can be accessed only through static method so we can't cache it. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333297 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); + final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); + final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); + CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor); + new Thread() { + @Override public void run() { + try { + dataLoadExecutor + .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + } catch (Exception e) { + dataLoadExecutor.close(); + throw new RuntimeException(e); + } + } + }.start(); + + return recordWriter; + } + + public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException { + CarbonLoadModel model; + String encodedString = conf.get(LOAD_MODEL); + if (encodedString != null) { + model = (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString); + return model; + } + model = new CarbonLoadModel(); + CarbonProperties carbonProperty = CarbonProperties.getInstance(); + model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); + model.setTableName(CarbonTableOutputFormat.getTableName(conf)); + model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getOrCreateCarbonTable(conf))); + model.setTablePath(getTablePath(conf)); + + setFileHeader(conf, model); + model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N")); + model.setBadRecordsLoggerEnable(conf.get(BAD_RECORDS_LOGGER_ENABLE, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))); + model.setBadRecordsAction(conf.get(BAD_RECORDS_LOGGER_ACTION, carbonProperty + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))); + + model.setIsEmptyDataBadRecord(conf.get(IS_EMPTY_DATA_BAD_RECORD, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))); + + model.setSkipEmptyLine(conf.get(SKIP_EMPTY_LINE, + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); + + String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\$" + "," + "\\:"); + String[] split = complexDelim.split(","); + model.setComplexDelimiterLevel1(split[0]); + if (split.length > 1) { + model.setComplexDelimiterLevel1(split[1]); + } + model.setDateFormat(conf.get(DATE_FORMAT, carbonProperty --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333341 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.ft; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; + +public class CarbonOutputMapperTest extends TestCase { + + CarbonLoadModel carbonLoadModel; + + // changed setUp to static init block to avoid un wanted multiple time store creation + static { + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); + } + + + @Test public void testOutputFormat() throws Exception { + runJob(""); + String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); + File file = new File(segmentPath); + assert (file.exists()); + File[] listFiles = file.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".carbondata") || name.endsWith(".carbonindex"); + } + }); + + assert (listFiles.length == 2); + + } + + + @Override public void tearDown() throws Exception { + super.tearDown(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); + } + + @Override public void setUp() throws Exception { + super.setUp(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); + carbonLoadModel = StoreCreator.getCarbonLoadModel(); + } + + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { + + @Override protected void map(NullWritable key, StringArrayWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } + + private void runJob(String outPath) + throws Exception { --- End diff -- it is testcase --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333346 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.ft; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; + +public class CarbonOutputMapperTest extends TestCase { + + CarbonLoadModel carbonLoadModel; + + // changed setUp to static init block to avoid un wanted multiple time store creation + static { + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); + } + + + @Test public void testOutputFormat() throws Exception { + runJob(""); + String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); + File file = new File(segmentPath); + assert (file.exists()); + File[] listFiles = file.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".carbondata") || name.endsWith(".carbonindex"); + } + }); + + assert (listFiles.length == 2); + + } + + + @Override public void tearDown() throws Exception { + super.tearDown(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); + } + + @Override public void setUp() throws Exception { + super.setUp(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); + carbonLoadModel = StoreCreator.getCarbonLoadModel(); + } + + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { + + @Override protected void map(NullWritable key, StringArrayWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } + + private void runJob(String outPath) + throws Exception { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333357 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.iterator; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.carbondata.common.CarbonIterator; + +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { + + private boolean close = false; + + /** + * Number of rows kept in memory at most will be batchSize * queue size + */ + private int batchSize = 1000; + + private RowBatch loadBatch = new RowBatch(batchSize); + + private RowBatch readBatch; + + private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); + + public void write(String[] row) throws InterruptedException { + --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333364 --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.ft; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; + +public class CarbonOutputMapperTest extends TestCase { + + CarbonLoadModel carbonLoadModel; + + // changed setUp to static init block to avoid un wanted multiple time store creation + static { + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); + } + + + @Test public void testOutputFormat() throws Exception { + runJob(""); + String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); + File file = new File(segmentPath); + assert (file.exists()); + File[] listFiles = file.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".carbondata") || name.endsWith(".carbonindex"); + } + }); + + assert (listFiles.length == 2); + + } + + + @Override public void tearDown() throws Exception { + super.tearDown(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); + } + + @Override public void setUp() throws Exception { + super.setUp(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); + carbonLoadModel = StoreCreator.getCarbonLoadModel(); + } + + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { + + @Override protected void map(NullWritable key, StringArrayWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } + + private void runJob(String outPath) + throws Exception { + + Configuration configuration = new Configuration(); + configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); + Job job = Job.getInstance(configuration); + job.setJarByClass(CarbonOutputMapperTest.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(StringArrayWritable.class); + job.setMapperClass(Map.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath())); + CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel); + CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()); + CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true); + job.setInputFormatClass(CSVInputFormat.class); + job.setOutputFormatClass(CarbonTableOutputFormat.class); + CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1")); + FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1")); + job.getConfiguration().set("outpath", outPath); + job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); + job.waitForCompletion(true); + } + + public static void main(String[] args) throws Exception { + + CarbonOutputMapperTest carbonOutputMapperTest = new CarbonOutputMapperTest(); --- End diff -- it is testcase and removed main here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333401 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), --- End diff -- ok, moved --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333417 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333418 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333420 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333626 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); --- End diff -- Now I get taskid from taskcontext instead of random --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333697 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.iterator; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.carbondata.common.CarbonIterator; + +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { + + private boolean close = false; + + /** + * Number of rows kept in memory at most will be batchSize * queue size + */ + private int batchSize = 1000; + + private RowBatch loadBatch = new RowBatch(batchSize); + + private RowBatch readBatch; + + private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); + + public void write(String[] row) throws InterruptedException { + + if (!loadBatch.addRow(row)) { + loadBatch.readyRead(); + queue.put(loadBatch); + loadBatch = new RowBatch(batchSize); + } + } + + @Override public boolean hasNext() { + return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext(); + } + + @Override public String[] next() { + if (readBatch == null || !readBatch.hasNext()) { + try { + readBatch = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return readBatch.next(); + } + + @Override public void close() { + if (loadBatch.isLoading()) { + try { + loadBatch.readyRead(); + queue.put(loadBatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + close = true; + } + + private static class RowBatch extends CarbonIterator<String[]> { + + private int counter; + + private String[][] batch; + + private int size; + + private boolean isLoading = true; + + public RowBatch(int size) { + batch = new String[size][]; + this.size = size; + } + + public boolean addRow(String[] row) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333728 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.iterator; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.carbondata.common.CarbonIterator; + +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { + + private boolean close = false; + + /** + * Number of rows kept in memory at most will be batchSize * queue size + */ + private int batchSize = 1000; + + private RowBatch loadBatch = new RowBatch(batchSize); + + private RowBatch readBatch; + + private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); + + public void write(String[] row) throws InterruptedException { + + if (!loadBatch.addRow(row)) { + loadBatch.readyRead(); + queue.put(loadBatch); + loadBatch = new RowBatch(batchSize); + } + } + + @Override public boolean hasNext() { + return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext(); + } + + @Override public String[] next() { + if (readBatch == null || !readBatch.hasNext()) { + try { + readBatch = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return readBatch.next(); + } + + @Override public void close() { + if (loadBatch.isLoading()) { + try { + loadBatch.readyRead(); + queue.put(loadBatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + close = true; + } + + private static class RowBatch extends CarbonIterator<String[]> { + + private int counter; + + private String[][] batch; + + private int size; + + private boolean isLoading = true; + + public RowBatch(int size) { --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157333850 --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java --- @@ -329,6 +329,47 @@ public static String readCurrentTime() { return date; } + public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model, + boolean insertOverwrite) throws IOException, InterruptedException { + LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails(); + SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS; + if (insertOverwrite) { + status = SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS; + } + + // reading the start time of data load. + long loadStartTime = CarbonUpdateUtil.readCurrentTime(); + model.setFactTimeStamp(loadStartTime); + CarbonLoaderUtil + .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false); + boolean entryAdded = + CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite); + if (!entryAdded) { + throw new IOException("Failed to add entry in table status for " + model.getTableName()); + } + } + + /** + * This method will update the load failure entry in the table status file + */ + public static void updateTableStatusForFailure(CarbonLoadModel model) + throws IOException, InterruptedException { + // in case if failure the load status should be "Marked for delete" so that it will be taken + // care during clean up + SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE; + // always the last entry in the load metadata details will be the current load entry + LoadMetadataDetails loadMetaEntry = + model.getLoadMetadataDetails().get(model.getLoadMetadataDetails().size() - 1); --- End diff -- ok, added new method --- |
Free forum by Nabble | Edit this page |