[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

classic Classic list List threaded Threaded
62 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197350068
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    --- End diff --
   
    This class should directly use JsonInputFormat readers, it should not directly depends on jaxon parser


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197350235
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    +    Objects.requireNonNull(object, "Input cannot be null");
    +    try {
    +      Map<String, Object> jsonNodeMap;
    +      ObjectMapper objectMapper = new ObjectMapper();
    +      try {
    +        jsonNodeMap =
    +            objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() {
    +            });
    +      } catch (IOException e) {
    +        throw new IOException("Failed to parse Json row string ");
    +      }
    +      // convert json object to carbon object.
    +      Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema);
    --- End diff --
   
    It should be like csv, so it should not convert to datatypes here. It should include DataConverterStep


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197350279
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    +    Objects.requireNonNull(object, "Input cannot be null");
    +    try {
    +      Map<String, Object> jsonNodeMap;
    +      ObjectMapper objectMapper = new ObjectMapper();
    +      try {
    +        jsonNodeMap =
    +            objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() {
    +            });
    +      } catch (IOException e) {
    +        throw new IOException("Failed to parse Json row string ");
    +      }
    +      // convert json object to carbon object.
    +      Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema);
    +      writable.set(writeObjects);
    +      recordWriter.write(NullWritable.get(), writable);
    +    } catch (Exception e) {
    +      close();
    +      throw new IOException(e);
    +    }
    +  }
    +
    +  /**
    +   * Takes file or directory path,
    +   * containing array of json rows to write carbondata files.
    +   *
    +   * @param inputFilePath
    +   * @param recordIdentifier
    +   * @throws IOException
    +   */
    +  public void writeFromJsonFile(String inputFilePath, String recordIdentifier) throws IOException {
    --- End diff --
   
    It should not  b here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197350393
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonReader.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.annotations.InterfaceStability;
    +import org.apache.carbondata.core.util.CarbonTaskInfo;
    +import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
    +
    +import org.apache.hadoop.mapreduce.RecordReader;
    +
    +/**
    + * Reader for JsonFile that uses JsonInputFormat with jackson parser
    + */
    +@InterfaceAudience.User
    +@InterfaceStability.Evolving
    +public class JsonReader<T> {
    --- End diff --
   
    why again one more reader, why don't you use JsonInputFormat reader


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197350528
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/util/JsonCarbonUtil.java ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.util;
    +
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
    +import org.apache.carbondata.core.metadata.datatype.ArrayType;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
    +import org.apache.carbondata.processing.loading.complexobjects.StructObject;
    +import org.apache.carbondata.sdk.file.Field;
    +import org.apache.carbondata.sdk.file.Schema;
    +
    +public class JsonCarbonUtil {
    +
    +
    +  public static Object[] jsonToCarbonRecord(Map<String, Object> jsonNodeMap, Schema carbonSchema)
    --- End diff --
   
    should directly use coverterstep of load


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6465/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5296/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197367547
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outKey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outJson;
    +    }
    +  }
    +
    +  /**
    +   * Decodes a given string of text to a {@link JsonNode}.
    +   *
    +   * @param line The line of text
    +   * @return The JsonNode or null if a JsonParseException,
    +   * JsonMappingException, or IOException error occurs
    +   */
    +  public static synchronized JsonNode decodeLineToJsonNode(String line) {
    --- End diff --
   
    Yes required, JsonRecordReader is a static class. so this method is referenced in that.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197372247
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonReader.java ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.annotations.InterfaceStability;
    +import org.apache.carbondata.core.util.CarbonTaskInfo;
    +import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
    +
    +import org.apache.hadoop.mapreduce.RecordReader;
    +
    +/**
    + * Reader for JsonFile that uses JsonInputFormat with jackson parser
    + */
    +@InterfaceAudience.User
    +@InterfaceStability.Evolving
    +public class JsonReader<T> {
    --- End diff --
   
    This is just a wrapper,  It uses the same record reader from JsonInputFormat.
   
    JsonReader -> is a wrapper with a list of JsonRecordReader from JsonInputFormat, similar to CarbonReader -> list of CarbonRecordReader from CsvInputFormat (one for for each split) and other warapper iterators.
   



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638025
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns
           }
         }
       }
    -}
     
    +  /**
    +   * Utility function to read a whole file as a string,
    +   * Must not use this if the file is very huge. As it may result in memory exhaustion.
    +   * @param filePath
    +   * @return
    +   */
    +  public static String readFromFile(String filePath) {
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638028
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
    @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns
           }
         }
       }
    -}
     
    +  /**
    +   * Utility function to read a whole file as a string,
    +   * Must not use this if the file is very huge. As it may result in memory exhaustion.
    +   * @param filePath
    +   * @return
    +   */
    +  public static String readFromFile(String filePath) {
    +    File file = new File(filePath);
    +    URI uri = file.toURI();
    +    byte[] bytes;
    +    try {
    +      bytes = java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(uri));
    +    } catch (IOException e) {
    +      e.printStackTrace();
    +      return "ERROR loading file " + filePath;
    +    }
    +    try {
    +      return new String(bytes, "UTF-8");
    +    } catch (UnsupportedEncodingException e) {
    +      return "ERROR while encoding to UTF-8 for file " + filePath;
    +    }
    +  }
    +
    +  /**
    +   * get list of file paths with requested extensions from the path
    +   * @param carbonFile
    +   * @param jsonFileList
    +   * @param fileExtension
    +   */
    +  public static void getFileList(CarbonFile carbonFile, List<String> jsonFileList,
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638029
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638048
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638055
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638059
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638053
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.processing.loading.jsoninput;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    +import org.apache.log4j.Logger;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +/**
    + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
    + * The JsonInputFormat will read two types of JSON formatted data. The default
    + * expectation is each JSON record is newline delimited. This method is
    + * generally faster and is backed by the {@link LineRecordReader} you are likely
    + * familiar with. The other method is 'pretty print' of JSON records, where
    + * records span multiple lines and often have some type of root identifier. This
    + * method is likely slower, but respects record boundaries much like the
    + * LineRecordReader.<br>
    + * <br>
    + * Use of the 'pretty print' reader requires a record identifier.
    + */
    +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
    +
    +  private static JsonFactory factory = new JsonFactory();
    +
    +  private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
    +
    +  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
    +
    +  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
    +      TaskAttemptContext context) throws IOException, InterruptedException {
    +    RecordReader<LongWritable, Text> rdr;
    +    if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) {
    +      rdr = new SimpleJsonRecordReader();
    +    } else {
    +      return new JsonRecordReader();
    +    }
    +    rdr.initialize(split, context);
    +    return rdr;
    +  }
    +
    +  /**
    +   * This class uses the {@link LineRecordReader} to read a line of JSON and
    +   * return it as a Text object.
    +   */
    +  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private LineRecordReader rdr = null;
    +
    +    private LongWritable outkey = new LongWritable(0L);
    +
    +    private Text outvalue = new Text();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      rdr = new LineRecordReader();
    +      rdr.initialize(split, context);
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      if (rdr.nextKeyValue()) {
    +        outvalue.set(rdr.getCurrentValue());
    +        outkey.set(rdr.getCurrentKey().get());
    +        return true;
    +      } else {
    +        return false;
    +      }
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return rdr.getProgress();
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outkey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outvalue;
    +    }
    +  }
    +
    +  /**
    +   * This class uses the {@link JsonStreamReader} to read JSON records from a
    +   * file. It respects split boundaries to complete full JSON records, as
    +   * specified by the root identifier. This class will discard any records
    +   * that it was unable to decode using
    +   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
    +   */
    +  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    +
    +    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
    +
    +    private JsonStreamReader rdr = null;
    +
    +    private long start = 0, end = 0;
    +
    +    private float toRead = 0;
    +
    +    private String identifier = null;
    +
    +    private Logger log = Logger.getLogger(JsonRecordReader.class);
    +
    +    private Text outJson = new Text();
    +
    +    private LongWritable outKey = new LongWritable();
    +
    +    @Override public void initialize(InputSplit split, TaskAttemptContext context)
    +        throws IOException, InterruptedException {
    +
    +      this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER);
    +
    +      if (this.identifier == null || identifier.isEmpty()) {
    +        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
    +      } else {
    +        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
    +      }
    +
    +      FileSplit fSplit = (FileSplit) split;
    +
    +      // get relevant data
    +      Path file = fSplit.getPath();
    +
    +      log.info("File is " + file);
    +
    +      start = fSplit.getStart();
    +      end = start + split.getLength();
    +      toRead = end - start;
    +
    +      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
    +
    +      if (start != 0) {
    +        strm.seek(start);
    +      }
    +
    +      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
    +    }
    +
    +    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +      boolean retval = false;
    +      boolean keepGoing = false;
    +      do {
    +        keepGoing = false;
    +        String record = rdr.getJsonRecord();
    +        if (record != null) {
    +          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
    +            keepGoing = true;
    +          } else {
    +            outJson.set(record);
    +            outKey.set(rdr.getBytesRead());
    +            retval = true;
    +          }
    +        }
    +      } while (keepGoing);
    +
    +      return retval;
    +    }
    +
    +    @Override public void close() throws IOException {
    +      rdr.close();
    +    }
    +
    +    @Override public float getProgress() throws IOException {
    +      return (float) rdr.getBytesRead() / toRead;
    +    }
    +
    +    @Override public LongWritable getCurrentKey() throws IOException, InterruptedException {
    +      return outKey;
    +    }
    +
    +    @Override public Text getCurrentValue() throws IOException, InterruptedException {
    +      return outJson;
    +    }
    +  }
    +
    +  /**
    +   * Decodes a given string of text to a {@link JsonNode}.
    +   *
    +   * @param line The line of text
    +   * @return The JsonNode or null if a JsonParseException,
    +   * JsonMappingException, or IOException error occurs
    +   */
    +  public static synchronized JsonNode decodeLineToJsonNode(String line) {
    +
    +    try {
    +      return mapper.readTree(line);
    +    } catch (JsonParseException e) {
    +      e.printStackTrace();
    +      return null;
    +    } catch (JsonMappingException e) {
    +      e.printStackTrace();
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638100
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    +    Objects.requireNonNull(object, "Input cannot be null");
    +    try {
    +      Map<String, Object> jsonNodeMap;
    +      ObjectMapper objectMapper = new ObjectMapper();
    +      try {
    +        jsonNodeMap =
    +            objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() {
    +            });
    +      } catch (IOException e) {
    +        throw new IOException("Failed to parse Json row string ");
    +      }
    +      // convert json object to carbon object.
    +      Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema);
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638101
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.sdk.file;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.UUID;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.util.JsonCarbonUtil;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hadoop.mapreduce.TaskID;
    +import org.apache.hadoop.mapreduce.TaskType;
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
    +
    +/**
    + * Writer Implementation to write Json Record to carbondata file.
    + * json writer requires the path of json file and carbon schema.
    + */
    +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
    +  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
    +  private TaskAttemptContext context;
    +  private ObjectArrayWritable writable;
    +  private Schema carbonSchema;
    +
    +  JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException {
    +    Configuration OutputHadoopConf = new Configuration();
    +    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
    +    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
    +    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
    +    Random random = new Random();
    +    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
    +    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
    +    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
    +    this.recordWriter = outputFormat.getRecordWriter(context);
    +    this.context = context;
    +    this.writable = new ObjectArrayWritable();
    +    this.carbonSchema = carbonSchema;
    +  }
    +
    +  /**
    +   * Write single row data, accepts one row of data as json string
    +   *
    +   * @param object (json row as a string)
    +   * @throws IOException
    +   */
    +  @Override public void write(Object object) throws IOException {
    +    Objects.requireNonNull(object, "Input cannot be null");
    +    try {
    +      Map<String, Object> jsonNodeMap;
    +      ObjectMapper objectMapper = new ObjectMapper();
    +      try {
    +        jsonNodeMap =
    +            objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() {
    +            });
    +      } catch (IOException e) {
    +        throw new IOException("Failed to parse Json row string ");
    +      }
    +      // convert json object to carbon object.
    +      Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema);
    +      writable.set(writeObjects);
    +      recordWriter.write(NullWritable.get(), writable);
    +    } catch (Exception e) {
    +      close();
    +      throw new IOException(e);
    +    }
    +  }
    +
    +  /**
    +   * Takes file or directory path,
    +   * containing array of json rows to write carbondata files.
    +   *
    +   * @param inputFilePath
    +   * @param recordIdentifier
    +   * @throws IOException
    +   */
    +  public void writeFromJsonFile(String inputFilePath, String recordIdentifier) throws IOException {
    --- End diff --
   
    removed it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2384: [CARBONDATA-2608] SDK Support JSON data loadi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2384#discussion_r197638106
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/util/JsonCarbonUtil.java ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.util;
    +
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
    +import org.apache.carbondata.core.metadata.datatype.ArrayType;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
    +import org.apache.carbondata.processing.loading.complexobjects.StructObject;
    +import org.apache.carbondata.sdk.file.Field;
    +import org.apache.carbondata.sdk.file.Schema;
    +
    +public class JsonCarbonUtil {
    +
    +
    +  public static Object[] jsonToCarbonRecord(Map<String, Object> jsonNodeMap, Schema carbonSchema)
    --- End diff --
   
    ok. done


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2384: [CARBONDATA-2608] SDK Support JSON data loading dire...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:

    https://github.com/apache/carbondata/pull/2384
 
    @ravipesala : please review. comments are addressed.


---
1234