[GitHub] carbondata pull request #1642: [CARBONDATA-1855] Added outputformat to carbo...

classic Classic list List threaded Threaded
77 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157332993
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.api;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobStatus;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +/**
    + * Outputcommitter which manages the segments during loading.
    + */
    +public class CarbonOutputCommitter extends FileOutputCommitter {
    +
    +  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +  }
    +
    +  /**
    +   * Update the tablestatus with inprogress while setup the job.
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void setupJob(JobContext context) throws IOException {
    +    super.setupJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
    +    } catch (InterruptedException e) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333001
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.api;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobStatus;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +/**
    + * Outputcommitter which manages the segments during loading.
    + */
    +public class CarbonOutputCommitter extends FileOutputCommitter {
    +
    +  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +  }
    +
    +  /**
    +   * Update the tablestatus with inprogress while setup the job.
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void setupJob(JobContext context) throws IOException {
    +    super.setupJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
    +  }
    +
    +  /**
    +   * Update the tablestatus as success after job is success
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void commitJob(JobContext context) throws IOException {
    +    super.commitJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      LoadMetadataDetails newMetaEntry =
    +          loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1);
    +      CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
    +          loadModel.getFactTimeStamp(), true);
    +      CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
    +          loadModel.getCarbonDataLoadSchema().getCarbonTable());
    +      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
    +    } catch (InterruptedException e) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333012
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.api;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobStatus;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +/**
    + * Outputcommitter which manages the segments during loading.
    + */
    +public class CarbonOutputCommitter extends FileOutputCommitter {
    +
    +  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +  }
    +
    +  /**
    +   * Update the tablestatus with inprogress while setup the job.
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void setupJob(JobContext context) throws IOException {
    +    super.setupJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
    +  }
    +
    +  /**
    +   * Update the tablestatus as success after job is success
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void commitJob(JobContext context) throws IOException {
    +    super.commitJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      LoadMetadataDetails newMetaEntry =
    +          loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1);
    +      CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
    +          loadModel.getFactTimeStamp(), true);
    +      CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
    +          loadModel.getCarbonDataLoadSchema().getCarbonTable());
    +      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +  }
    +
    +  /**
    +   * Update the tablestatus as fail if any fail happens.
    +   *
    +   * @param context
    +   * @param state
    +   * @throws IOException
    +   */
    +  @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException {
    +    super.abortJob(context, state);
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
    +    } catch (InterruptedException e) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333060
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.api;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobStatus;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +/**
    + * Outputcommitter which manages the segments during loading.
    + */
    +public class CarbonOutputCommitter extends FileOutputCommitter {
    +
    +  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +  }
    +
    +  /**
    +   * Update the tablestatus with inprogress while setup the job.
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void setupJob(JobContext context) throws IOException {
    +    super.setupJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
    +  }
    +
    +  /**
    +   * Update the tablestatus as success after job is success
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void commitJob(JobContext context) throws IOException {
    +    super.commitJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      LoadMetadataDetails newMetaEntry =
    +          loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1);
    +      CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
    +          loadModel.getFactTimeStamp(), true);
    +      CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
    +          loadModel.getCarbonDataLoadSchema().getCarbonTable());
    +      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +  }
    +
    +  /**
    +   * Update the tablestatus as fail if any fail happens.
    +   *
    +   * @param context
    +   * @param state
    +   * @throws IOException
    +   */
    +  @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333109
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333123
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333237
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    +    return this.committer;
    +  }
    +
    +  @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
    +      TaskAttemptContext taskAttemptContext) throws IOException {
    +    final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
    +    loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + "");
    +    final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
    +    final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
    +    final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
    +    CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor);
    +    new Thread() {
    +      @Override public void run() {
    +        try {
    +          dataLoadExecutor
    +              .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper });
    +        } catch (Exception e) {
    +          dataLoadExecutor.close();
    +          throw new RuntimeException(e);
    +        }
    +      }
    +    }.start();
    +
    +    return recordWriter;
    +  }
    +
    +  public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException {
    +    CarbonLoadModel model;
    --- End diff --
   
    But it can be accessed only through static method so we can't cache it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333297
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    +    return this.committer;
    +  }
    +
    +  @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
    +      TaskAttemptContext taskAttemptContext) throws IOException {
    +    final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
    +    loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + "");
    +    final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
    +    final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
    +    final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
    +    CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor);
    +    new Thread() {
    +      @Override public void run() {
    +        try {
    +          dataLoadExecutor
    +              .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper });
    +        } catch (Exception e) {
    +          dataLoadExecutor.close();
    +          throw new RuntimeException(e);
    +        }
    +      }
    +    }.start();
    +
    +    return recordWriter;
    +  }
    +
    +  public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException {
    +    CarbonLoadModel model;
    +    String encodedString = conf.get(LOAD_MODEL);
    +    if (encodedString != null) {
    +      model = (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      return model;
    +    }
    +    model = new CarbonLoadModel();
    +    CarbonProperties carbonProperty = CarbonProperties.getInstance();
    +    model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
    +    model.setTableName(CarbonTableOutputFormat.getTableName(conf));
    +    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getOrCreateCarbonTable(conf)));
    +    model.setTablePath(getTablePath(conf));
    +
    +    setFileHeader(conf, model);
    +    model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N"));
    +    model.setBadRecordsLoggerEnable(conf.get(BAD_RECORDS_LOGGER_ENABLE, carbonProperty
    +        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
    +            CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
    +    model.setBadRecordsAction(conf.get(BAD_RECORDS_LOGGER_ACTION, carbonProperty
    +        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
    +            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)));
    +
    +    model.setIsEmptyDataBadRecord(conf.get(IS_EMPTY_DATA_BAD_RECORD, carbonProperty
    +        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
    +            CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
    +
    +    model.setSkipEmptyLine(conf.get(SKIP_EMPTY_LINE,
    +        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
    +
    +    String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\$" + "," + "\\:");
    +    String[] split = complexDelim.split(",");
    +    model.setComplexDelimiterLevel1(split[0]);
    +    if (split.length > 1) {
    +      model.setComplexDelimiterLevel1(split[1]);
    +    }
    +    model.setDateFormat(conf.get(DATE_FORMAT, carbonProperty
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333341
 
    --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.hadoop.ft;
    +
    +import java.io.File;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.test.util.StoreCreator;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import junit.framework.TestCase;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +import org.junit.Test;
    +
    +public class CarbonOutputMapperTest extends TestCase {
    +
    +  CarbonLoadModel carbonLoadModel;
    +
    +  // changed setUp to static init block to avoid un wanted multiple time store creation
    +  static {
    +    CarbonProperties.getInstance().
    +        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
    +  }
    +
    +
    +  @Test public void testOutputFormat() throws Exception {
    +    runJob("");
    +    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
    +    File file = new File(segmentPath);
    +    assert (file.exists());
    +    File[] listFiles = file.listFiles(new FilenameFilter() {
    +      @Override public boolean accept(File dir, String name) {
    +        return name.endsWith(".carbondata") || name.endsWith(".carbonindex");
    +      }
    +    });
    +
    +    assert (listFiles.length == 2);
    +
    +  }
    +
    +
    +  @Override public void tearDown() throws Exception {
    +    super.tearDown();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
    +  }
    +
    +  @Override public void setUp() throws Exception {
    +    super.setUp();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
    +    carbonLoadModel = StoreCreator.getCarbonLoadModel();
    +  }
    +
    + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> {
    +
    +   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
    +       throws IOException, InterruptedException {
    +     context.write(key, value);
    +   }
    + }
    +
    +  private void runJob(String outPath)
    +      throws Exception {
    --- End diff --
   
    it is testcase


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333346
 
    --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.hadoop.ft;
    +
    +import java.io.File;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.test.util.StoreCreator;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import junit.framework.TestCase;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +import org.junit.Test;
    +
    +public class CarbonOutputMapperTest extends TestCase {
    +
    +  CarbonLoadModel carbonLoadModel;
    +
    +  // changed setUp to static init block to avoid un wanted multiple time store creation
    +  static {
    +    CarbonProperties.getInstance().
    +        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
    +  }
    +
    +
    +  @Test public void testOutputFormat() throws Exception {
    +    runJob("");
    +    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
    +    File file = new File(segmentPath);
    +    assert (file.exists());
    +    File[] listFiles = file.listFiles(new FilenameFilter() {
    +      @Override public boolean accept(File dir, String name) {
    +        return name.endsWith(".carbondata") || name.endsWith(".carbonindex");
    +      }
    +    });
    +
    +    assert (listFiles.length == 2);
    +
    +  }
    +
    +
    +  @Override public void tearDown() throws Exception {
    +    super.tearDown();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
    +  }
    +
    +  @Override public void setUp() throws Exception {
    +    super.setUp();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
    +    carbonLoadModel = StoreCreator.getCarbonLoadModel();
    +  }
    +
    + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> {
    +
    +   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
    +       throws IOException, InterruptedException {
    +     context.write(key, value);
    +   }
    + }
    +
    +  private void runJob(String outPath)
    +      throws Exception {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333357
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.processing.loading.iterator;
    +
    +import java.util.concurrent.ArrayBlockingQueue;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +
    +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
    +
    +  private boolean close = false;
    +
    +  /**
    +   * Number of rows kept in memory at most will be batchSize * queue size
    +   */
    +  private int batchSize = 1000;
    +
    +  private RowBatch loadBatch = new RowBatch(batchSize);
    +
    +  private RowBatch readBatch;
    +
    +  private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
    +
    +  public void write(String[] row) throws InterruptedException {
    +
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333364
 
    --- Diff: hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.hadoop.ft;
    +
    +import java.io.File;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.test.util.StoreCreator;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import junit.framework.TestCase;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    +import org.junit.Test;
    +
    +public class CarbonOutputMapperTest extends TestCase {
    +
    +  CarbonLoadModel carbonLoadModel;
    +
    +  // changed setUp to static init block to avoid un wanted multiple time store creation
    +  static {
    +    CarbonProperties.getInstance().
    +        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
    +  }
    +
    +
    +  @Test public void testOutputFormat() throws Exception {
    +    runJob("");
    +    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
    +    File file = new File(segmentPath);
    +    assert (file.exists());
    +    File[] listFiles = file.listFiles(new FilenameFilter() {
    +      @Override public boolean accept(File dir, String name) {
    +        return name.endsWith(".carbondata") || name.endsWith(".carbonindex");
    +      }
    +    });
    +
    +    assert (listFiles.length == 2);
    +
    +  }
    +
    +
    +  @Override public void tearDown() throws Exception {
    +    super.tearDown();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
    +  }
    +
    +  @Override public void setUp() throws Exception {
    +    super.setUp();
    +    CarbonProperties.getInstance()
    +        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
    +    carbonLoadModel = StoreCreator.getCarbonLoadModel();
    +  }
    +
    + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> {
    +
    +   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
    +       throws IOException, InterruptedException {
    +     context.write(key, value);
    +   }
    + }
    +
    +  private void runJob(String outPath)
    +      throws Exception {
    +
    +    Configuration configuration = new Configuration();
    +    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
    +    Job job = Job.getInstance(configuration);
    +    job.setJarByClass(CarbonOutputMapperTest.class);
    +    job.setOutputKeyClass(NullWritable.class);
    +    job.setOutputValueClass(StringArrayWritable.class);
    +    job.setMapperClass(Map.class);
    +    job.setNumReduceTasks(0);
    +
    +    FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath()));
    +    CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel);
    +    CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
    +    CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true);
    +    job.setInputFormatClass(CSVInputFormat.class);
    +    job.setOutputFormatClass(CarbonTableOutputFormat.class);
    +    CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1"));
    +    FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1"));
    +    job.getConfiguration().set("outpath", outPath);
    +    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
    +    job.waitForCompletion(true);
    +  }
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    CarbonOutputMapperTest carbonOutputMapperTest = new CarbonOutputMapperTest();
    --- End diff --
   
    it is testcase and removed main here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333401
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.api;
    +
    +import java.io.IOException;
    +
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobStatus;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
    +
    +/**
    + * Outputcommitter which manages the segments during loading.
    + */
    +public class CarbonOutputCommitter extends FileOutputCommitter {
    +
    +  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
    +    super(outputPath, context);
    +  }
    +
    +  /**
    +   * Update the tablestatus with inprogress while setup the job.
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void setupJob(JobContext context) throws IOException {
    +    super.setupJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
    +    } catch (InterruptedException e) {
    +      throw new IOException(e);
    +    }
    +    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
    +  }
    +
    +  /**
    +   * Update the tablestatus as success after job is success
    +   *
    +   * @param context
    +   * @throws IOException
    +   */
    +  @Override public void commitJob(JobContext context) throws IOException {
    +    super.commitJob(context);
    +    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
    +    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
    +    try {
    +      LoadMetadataDetails newMetaEntry =
    +          loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1);
    +      CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
    +          loadModel.getFactTimeStamp(), true);
    +      CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
    --- End diff --
   
    ok, moved


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333417
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333418
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333420
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    +    return this.committer;
    +  }
    +
    +  @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
    --- End diff --
   
    ok



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333626
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
    +      throws IOException {
    +    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    +    return this.committer;
    +  }
    +
    +  @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
    +      TaskAttemptContext taskAttemptContext) throws IOException {
    +    final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
    +    loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + "");
    --- End diff --
   
    Now I get taskid from taskcontext instead of random


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333697
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.processing.loading.iterator;
    +
    +import java.util.concurrent.ArrayBlockingQueue;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +
    +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
    +
    +  private boolean close = false;
    +
    +  /**
    +   * Number of rows kept in memory at most will be batchSize * queue size
    +   */
    +  private int batchSize = 1000;
    +
    +  private RowBatch loadBatch = new RowBatch(batchSize);
    +
    +  private RowBatch readBatch;
    +
    +  private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
    +
    +  public void write(String[] row) throws InterruptedException {
    +
    +    if (!loadBatch.addRow(row)) {
    +      loadBatch.readyRead();
    +      queue.put(loadBatch);
    +      loadBatch = new RowBatch(batchSize);
    +    }
    +  }
    +
    +  @Override public boolean hasNext() {
    +    return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext();
    +  }
    +
    +  @Override public String[] next() {
    +    if (readBatch == null || !readBatch.hasNext()) {
    +      try {
    +        readBatch = queue.take();
    +      } catch (InterruptedException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    return readBatch.next();
    +  }
    +
    +  @Override public void close() {
    +    if (loadBatch.isLoading()) {
    +      try {
    +        loadBatch.readyRead();
    +        queue.put(loadBatch);
    +      } catch (InterruptedException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    close = true;
    +  }
    +
    +  private static class RowBatch extends CarbonIterator<String[]> {
    +
    +    private int counter;
    +
    +    private String[][] batch;
    +
    +    private int size;
    +
    +    private boolean isLoading = true;
    +
    +    public RowBatch(int size) {
    +      batch = new String[size][];
    +      this.size = size;
    +    }
    +
    +    public boolean addRow(String[] row) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333728
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.processing.loading.iterator;
    +
    +import java.util.concurrent.ArrayBlockingQueue;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +
    +public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
    +
    +  private boolean close = false;
    +
    +  /**
    +   * Number of rows kept in memory at most will be batchSize * queue size
    +   */
    +  private int batchSize = 1000;
    +
    +  private RowBatch loadBatch = new RowBatch(batchSize);
    +
    +  private RowBatch readBatch;
    +
    +  private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
    +
    +  public void write(String[] row) throws InterruptedException {
    +
    +    if (!loadBatch.addRow(row)) {
    +      loadBatch.readyRead();
    +      queue.put(loadBatch);
    +      loadBatch = new RowBatch(batchSize);
    +    }
    +  }
    +
    +  @Override public boolean hasNext() {
    +    return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext();
    +  }
    +
    +  @Override public String[] next() {
    +    if (readBatch == null || !readBatch.hasNext()) {
    +      try {
    +        readBatch = queue.take();
    +      } catch (InterruptedException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    return readBatch.next();
    +  }
    +
    +  @Override public void close() {
    +    if (loadBatch.isLoading()) {
    +      try {
    +        loadBatch.readyRead();
    +        queue.put(loadBatch);
    +      } catch (InterruptedException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    close = true;
    +  }
    +
    +  private static class RowBatch extends CarbonIterator<String[]> {
    +
    +    private int counter;
    +
    +    private String[][] batch;
    +
    +    private int size;
    +
    +    private boolean isLoading = true;
    +
    +    public RowBatch(int size) {
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1642: [CARBONDATA-1855][PARTITION] Added outputform...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333850
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---
    @@ -329,6 +329,47 @@ public static String readCurrentTime() {
         return date;
       }
     
    +  public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
    +      boolean insertOverwrite) throws IOException, InterruptedException {
    +    LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
    +    SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS;
    +    if (insertOverwrite) {
    +      status = SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS;
    +    }
    +
    +    // reading the start time of data load.
    +    long loadStartTime = CarbonUpdateUtil.readCurrentTime();
    +    model.setFactTimeStamp(loadStartTime);
    +    CarbonLoaderUtil
    +        .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
    +    boolean entryAdded =
    +        CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
    +    if (!entryAdded) {
    +      throw new IOException("Failed to add entry in table status for " + model.getTableName());
    +    }
    +  }
    +
    +  /**
    +   * This method will update the load failure entry in the table status file
    +   */
    +  public static void updateTableStatusForFailure(CarbonLoadModel model)
    +      throws IOException, InterruptedException {
    +    // in case if failure the load status should be "Marked for delete" so that it will be taken
    +    // care during clean up
    +    SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE;
    +    // always the last entry in the load metadata details will be the current load entry
    +    LoadMetadataDetails loadMetaEntry =
    +        model.getLoadMetadataDetails().get(model.getLoadMetadataDetails().size() - 1);
    --- End diff --
   
    ok, added new method


---
1234