GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1642 [CARBONDATA-1855] Added outputformat to carbon Support standard Hadoop outputformat interface for carbon. It will be helpful for integrations to execution engines like the spark, hive, and presto. It should maintain segment management as well while writing the data to support incremental loading feature. Be sure to do all of the following checklists to help us incorporate your contribution quickly and easily: - [X] Any interfaces changed? - [] Any backward compatibility impacted? - [X] Document update required? - [X] Testing done Tests added - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata carbon-outformat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1642 ---- commit 1b149b4f0d67e54257915f7c8b2b467fd7a3b04a Author: ravipesala <[hidden email]> Date: 2017-12-04T10:37:03Z Added outputformat for carbon ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1642 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2224/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1879/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/649/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/650/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1880/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1642 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2225/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/724/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1642 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1952/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1642 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2278/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157121015 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. --- End diff -- Add description of what does `Table` means, it should mean that this output format will write dictionary file and carbondata file in a new segment. Let user know It is more than a file. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157121136 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. --- End diff -- Can you also describe what is committed here, it should include metadata like table status file --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157131983 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { --- End diff -- I think `InterruptedException` is never thrown in `recordNewLoadMetadata`, please remove it --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157132129 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { --- End diff -- This can be removed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157132915 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Update the tablestatus as fail if any fail happens. + * + * @param context + * @param state + * @throws IOException + */ + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { + super.abortJob(context, state); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel); + } catch (InterruptedException e) { --- End diff -- This can be removed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157133035 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Update the tablestatus as fail if any fail happens. + * + * @param context + * @param state + * @throws IOException + */ + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { --- End diff -- I think it is better to log the state in this function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157133238 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; --- End diff -- Why this name start with carbon, it should start with mapreduce, right? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157133369 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; --- End diff -- I think it is better to change all configurations name to pattern: `mapreduce.carbontable.xxx`, since it is for CarbonTableOutputFormat, not for file --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157133386 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; --- End diff -- typo in model --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1642#discussion_r157136520 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); + final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); + final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); + CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor); + new Thread() { + @Override public void run() { + try { + dataLoadExecutor + .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + } catch (Exception e) { + dataLoadExecutor.close(); + throw new RuntimeException(e); + } + } + }.start(); + + return recordWriter; + } + + public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException { + CarbonLoadModel model; --- End diff -- Can we cache this model locally in this class, like `getOrCreate` usage --- |
Free forum by Nabble | Edit this page |