[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

classic Classic list List threaded Threaded
62 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
GitHub user ajantha-bhat opened a pull request:

    https://github.com/apache/carbondata/pull/2384

    [CARBONDATA-2608] SDK Support JSON data loading directly (without AVRO conversion)

    What changes were proposed in this pull request?
    Currently SDK Support JSON data loading only with AVRO support.
    So, converting json to avro record and avro to carbon object is a two step process. Hence there is a need for a new carbonWriter that works with Json without AVRO.
    This PR implents that.
   
    Highlights:
    Works with just the json data and carbon schema.
    supports reading multiple json files in a folder.
    supports single row json write.
   
    How was this patch tested?
   
    Manual testing, and UTs are added in another PR.
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [ ] Any interfaces changed? NA
     
     - [ ] Any backward compatibility impacted? NA
     
     - [ ] Document update required? Yes, will be handled in separate PR
   
     - [ ] Testing done
    Yes, updated the UT.      
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.  NA
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ajantha-bhat/carbondata issue_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2384.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2384
   
----
commit 0c99d11c68d681f15c051d8c8e3ded5ced8b1708
Author: ajantha-bhat <ajanthabhat@...>
Date:   2018-06-15T10:21:16Z

    JsonCarbonWrtier

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6393/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5227/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5338/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5339/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6395/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5341/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5229/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6456/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5287/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5377/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5378/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197346144
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns
           }
         }
       }
    -}
     
    +  /**
    +   * Utility function to read a whole file as a string,
    +   * Must not use this if the file is very huge. As it may result in memory exhaustion.
    +   * @param filePath
    +   * @return
    +   */
    +  public static String readFromFile(String filePath) {
    --- End diff --
   
    If it is only used for test case purpose, move these to test cases only.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197346188
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns
           }
         }
       }
    -}
     
    +  /**
    +   * Utility function to read a whole file as a string,
    +   * Must not use this if the file is very huge. As it may result in memory exhaustion.
    +   * @param filePath
    +   * @return
    +   */
    +  public static String readFromFile(String filePath) {
    +    File file = new File(filePath);
    +    URI uri = file.toURI();
    +    byte[] bytes;
    +    try {
    +      bytes = java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(uri));
    +    } catch (IOException e) {
    +      e.printStackTrace();
    +      return "ERROR loading file " + filePath;
    +    }
    +    try {
    +      return new String(bytes, "UTF-8");
    +    } catch (UnsupportedEncodingException e) {
    +      return "ERROR while encoding to UTF-8 for file " + filePath;
    +    }
    +  }
    +
    +  /**
    +   * get list of file paths with requested extensions from the path
    +   * @param carbonFile
    +   * @param jsonFileList
    +   * @param fileExtension
    +   */
    +  public static void getFileList(CarbonFile carbonFile, List<String> jsonFileList,
    --- End diff --
   
    It is unused method, please remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197347008
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    --- End diff --
   
    use `getOneRecordPerLine` method here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197347203
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    --- End diff --
   
    Use `getRecordIdentifier` here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197349384
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outKey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outJson;
    +    }
    +  }
    +
    +  /**
    +   * Decodes a given string of text to a {@link JsonNode}.
    +   *
    +   * @param line The line of text
    +   * @return The JsonNode or null if a JsonParseException,
    +   * JsonMappingException, or IOException error occurs
    +   */
    +  public static synchronized JsonNode decodeLineToJsonNode(String line) {
    --- End diff --
   
    static is required here?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197349453
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outKey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outJson;
    +    }
    +  }
    +
    +  /**
    +   * Decodes a given string of text to a {@link JsonNode}.
    +   *
    +   * @param line The line of text
    +   * @return The JsonNode or null if a JsonParseException,
    +   * JsonMappingException, or IOException error occurs
    +   */
    +  public static synchronized JsonNode decodeLineToJsonNode(String line) {
    +
    +    try {
    +      return mapper.readTree(line);
    +    } catch (JsonParseException e) {
    +      e.printStackTrace();
    +      return null;
    +    } catch (JsonMappingException e) {
    +      e.printStackTrace();
    --- End diff --
   
    remove stack trace and add logger and throw the exception.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197349708
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    --- End diff --
   
    remove unnecessary throw exceptions from all methods


---
1234