Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197350068 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { --- End diff -- This class should directly use JsonInputFormat readers, it should not directly depends on jaxon parser --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197350235 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { + Objects.requireNonNull(object, "Input cannot be null"); + try { + Map<String, Object> jsonNodeMap; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNodeMap = + objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new IOException("Failed to parse Json row string "); + } + // convert json object to carbon object. + Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema); --- End diff -- It should be like csv, so it should not convert to datatypes here. It should include DataConverterStep --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197350279 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { + Objects.requireNonNull(object, "Input cannot be null"); + try { + Map<String, Object> jsonNodeMap; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNodeMap = + objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new IOException("Failed to parse Json row string "); + } + // convert json object to carbon object. + Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema); + writable.set(writeObjects); + recordWriter.write(NullWritable.get(), writable); + } catch (Exception e) { + close(); + throw new IOException(e); + } + } + + /** + * Takes file or directory path, + * containing array of json rows to write carbondata files. + * + * @param inputFilePath + * @param recordIdentifier + * @throws IOException + */ + public void writeFromJsonFile(String inputFilePath, String recordIdentifier) throws IOException { --- End diff -- It should not b here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197350393 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonReader.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.util.CarbonTaskInfo; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; + +import org.apache.hadoop.mapreduce.RecordReader; + +/** + * Reader for JsonFile that uses JsonInputFormat with jackson parser + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class JsonReader<T> { --- End diff -- why again one more reader, why don't you use JsonInputFormat reader --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197350528 --- Diff: store/sdk/src/main/java/org/apache/carbondata/util/JsonCarbonUtil.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.util; + +import java.io.IOException; +import java.math.BigDecimal; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; +import org.apache.carbondata.core.metadata.datatype.ArrayType; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; +import org.apache.carbondata.processing.loading.complexobjects.StructObject; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; + +public class JsonCarbonUtil { + + + public static Object[] jsonToCarbonRecord(Map<String, Object> jsonNodeMap, Schema carbonSchema) --- End diff -- should directly use coverterstep of load --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2384 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6465/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2384 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5296/ --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197367547 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.jsoninput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package + * The JsonInputFormat will read two types of JSON formatted data. The default + * expectation is each JSON record is newline delimited. This method is + * generally faster and is backed by the {@link LineRecordReader} you are likely + * familiar with. The other method is 'pretty print' of JSON records, where + * records span multiple lines and often have some type of root identifier. This + * method is likely slower, but respects record boundaries much like the + * LineRecordReader.<br> + * <br> + * Use of the 'pretty print' reader requires a record identifier. + */ +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> { + + private static JsonFactory factory = new JsonFactory(); + + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + RecordReader<LongWritable, Text> rdr; + if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) { + rdr = new SimpleJsonRecordReader(); + } else { + return new JsonRecordReader(); + } + rdr.initialize(split, context); + return rdr; + } + + /** + * This class uses the {@link LineRecordReader} to read a line of JSON and + * return it as a Text object. + */ + public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> { + + private LineRecordReader rdr = null; + + private LongWritable outkey = new LongWritable(0L); + + private Text outvalue = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + rdr = new LineRecordReader(); + rdr.initialize(split, context); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (rdr.nextKeyValue()) { + outvalue.set(rdr.getCurrentValue()); + outkey.set(rdr.getCurrentKey().get()); + return true; + } else { + return false; + } + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return rdr.getProgress(); + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outkey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outvalue; + } + } + + /** + * This class uses the {@link JsonStreamReader} to read JSON records from a + * file. It respects split boundaries to complete full JSON records, as + * specified by the root identifier. This class will discard any records + * that it was unable to decode using + * {@link JsonInputFormat#decodeLineToJsonNode(String)} + */ + public static class JsonRecordReader extends RecordReader<LongWritable, Text> { + + private Logger LOG = Logger.getLogger(JsonRecordReader.class); + + private JsonStreamReader rdr = null; + + private long start = 0, end = 0; + + private float toRead = 0; + + private String identifier = null; + + private Logger log = Logger.getLogger(JsonRecordReader.class); + + private Text outJson = new Text(); + + private LongWritable outKey = new LongWritable(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER); + + if (this.identifier == null || identifier.isEmpty()) { + throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set."); + } else { + LOG.info("Initializing JsonRecordReader with identifier " + identifier); + } + + FileSplit fSplit = (FileSplit) split; + + // get relevant data + Path file = fSplit.getPath(); + + log.info("File is " + file); + + start = fSplit.getStart(); + end = start + split.getLength(); + toRead = end - start; + + FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file); + + if (start != 0) { + strm.seek(start); + } + + rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + boolean retval = false; + boolean keepGoing = false; + do { + keepGoing = false; + String record = rdr.getJsonRecord(); + if (record != null) { + if (JsonInputFormat.decodeLineToJsonNode(record) == null) { + keepGoing = true; + } else { + outJson.set(record); + outKey.set(rdr.getBytesRead()); + retval = true; + } + } + } while (keepGoing); + + return retval; + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return (float) rdr.getBytesRead() / toRead; + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outKey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outJson; + } + } + + /** + * Decodes a given string of text to a {@link JsonNode}. + * + * @param line The line of text + * @return The JsonNode or null if a JsonParseException, + * JsonMappingException, or IOException error occurs + */ + public static synchronized JsonNode decodeLineToJsonNode(String line) { --- End diff -- Yes required, JsonRecordReader is a static class. so this method is referenced in that. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197372247 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonReader.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.util.CarbonTaskInfo; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; + +import org.apache.hadoop.mapreduce.RecordReader; + +/** + * Reader for JsonFile that uses JsonInputFormat with jackson parser + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class JsonReader<T> { --- End diff -- This is just a wrapper, It uses the same record reader from JsonInputFormat. JsonReader -> is a wrapper with a list of JsonRecordReader from JsonInputFormat, similar to CarbonReader -> list of CarbonRecordReader from CsvInputFormat (one for for each split) and other warapper iterators. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638025 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns } } } -} + /** + * Utility function to read a whole file as a string, + * Must not use this if the file is very huge. As it may result in memory exhaustion. + * @param filePath + * @return + */ + public static String readFromFile(String filePath) { --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638028 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -3116,5 +3117,48 @@ public static void setLocalDictColumnsToWrapperSchema(List<ColumnSchema> columns } } } -} + /** + * Utility function to read a whole file as a string, + * Must not use this if the file is very huge. As it may result in memory exhaustion. + * @param filePath + * @return + */ + public static String readFromFile(String filePath) { + File file = new File(filePath); + URI uri = file.toURI(); + byte[] bytes; + try { + bytes = java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(uri)); + } catch (IOException e) { + e.printStackTrace(); + return "ERROR loading file " + filePath; + } + try { + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + return "ERROR while encoding to UTF-8 for file " + filePath; + } + } + + /** + * get list of file paths with requested extensions from the path + * @param carbonFile + * @param jsonFileList + * @param fileExtension + */ + public static void getFileList(CarbonFile carbonFile, List<String> jsonFileList, --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638029 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.jsoninput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package + * The JsonInputFormat will read two types of JSON formatted data. The default + * expectation is each JSON record is newline delimited. This method is + * generally faster and is backed by the {@link LineRecordReader} you are likely + * familiar with. The other method is 'pretty print' of JSON records, where + * records span multiple lines and often have some type of root identifier. This + * method is likely slower, but respects record boundaries much like the + * LineRecordReader.<br> + * <br> + * Use of the 'pretty print' reader requires a record identifier. + */ +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> { + + private static JsonFactory factory = new JsonFactory(); + + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + RecordReader<LongWritable, Text> rdr; + if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) { --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638048 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.jsoninput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package + * The JsonInputFormat will read two types of JSON formatted data. The default + * expectation is each JSON record is newline delimited. This method is + * generally faster and is backed by the {@link LineRecordReader} you are likely + * familiar with. The other method is 'pretty print' of JSON records, where + * records span multiple lines and often have some type of root identifier. This + * method is likely slower, but respects record boundaries much like the + * LineRecordReader.<br> + * <br> + * Use of the 'pretty print' reader requires a record identifier. + */ +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> { + + private static JsonFactory factory = new JsonFactory(); + + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + RecordReader<LongWritable, Text> rdr; + if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) { + rdr = new SimpleJsonRecordReader(); + } else { + return new JsonRecordReader(); + } + rdr.initialize(split, context); + return rdr; + } + + /** + * This class uses the {@link LineRecordReader} to read a line of JSON and + * return it as a Text object. + */ + public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> { + + private LineRecordReader rdr = null; + + private LongWritable outkey = new LongWritable(0L); + + private Text outvalue = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + rdr = new LineRecordReader(); + rdr.initialize(split, context); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (rdr.nextKeyValue()) { + outvalue.set(rdr.getCurrentValue()); + outkey.set(rdr.getCurrentKey().get()); + return true; + } else { + return false; + } + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return rdr.getProgress(); + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outkey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outvalue; + } + } + + /** + * This class uses the {@link JsonStreamReader} to read JSON records from a + * file. It respects split boundaries to complete full JSON records, as + * specified by the root identifier. This class will discard any records + * that it was unable to decode using + * {@link JsonInputFormat#decodeLineToJsonNode(String)} + */ + public static class JsonRecordReader extends RecordReader<LongWritable, Text> { + + private Logger LOG = Logger.getLogger(JsonRecordReader.class); + + private JsonStreamReader rdr = null; + + private long start = 0, end = 0; + + private float toRead = 0; + + private String identifier = null; + + private Logger log = Logger.getLogger(JsonRecordReader.class); + + private Text outJson = new Text(); + + private LongWritable outKey = new LongWritable(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER); --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638055 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.jsoninput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package + * The JsonInputFormat will read two types of JSON formatted data. The default + * expectation is each JSON record is newline delimited. This method is + * generally faster and is backed by the {@link LineRecordReader} you are likely + * familiar with. The other method is 'pretty print' of JSON records, where + * records span multiple lines and often have some type of root identifier. This + * method is likely slower, but respects record boundaries much like the + * LineRecordReader.<br> + * <br> + * Use of the 'pretty print' reader requires a record identifier. + */ +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> { + + private static JsonFactory factory = new JsonFactory(); + + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + RecordReader<LongWritable, Text> rdr; + if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) { + rdr = new SimpleJsonRecordReader(); + } else { + return new JsonRecordReader(); + } + rdr.initialize(split, context); + return rdr; + } + + /** + * This class uses the {@link LineRecordReader} to read a line of JSON and + * return it as a Text object. + */ + public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> { + + private LineRecordReader rdr = null; + + private LongWritable outkey = new LongWritable(0L); + + private Text outvalue = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + rdr = new LineRecordReader(); + rdr.initialize(split, context); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (rdr.nextKeyValue()) { + outvalue.set(rdr.getCurrentValue()); + outkey.set(rdr.getCurrentKey().get()); + return true; + } else { + return false; + } + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return rdr.getProgress(); + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outkey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outvalue; + } + } + + /** + * This class uses the {@link JsonStreamReader} to read JSON records from a + * file. It respects split boundaries to complete full JSON records, as + * specified by the root identifier. This class will discard any records + * that it was unable to decode using + * {@link JsonInputFormat#decodeLineToJsonNode(String)} + */ + public static class JsonRecordReader extends RecordReader<LongWritable, Text> { + + private Logger LOG = Logger.getLogger(JsonRecordReader.class); + + private JsonStreamReader rdr = null; + + private long start = 0, end = 0; + + private float toRead = 0; + + private String identifier = null; + + private Logger log = Logger.getLogger(JsonRecordReader.class); + + private Text outJson = new Text(); + + private LongWritable outKey = new LongWritable(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER); + + if (this.identifier == null || identifier.isEmpty()) { + throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set."); + } else { + LOG.info("Initializing JsonRecordReader with identifier " + identifier); + } + + FileSplit fSplit = (FileSplit) split; + + // get relevant data + Path file = fSplit.getPath(); + + log.info("File is " + file); + + start = fSplit.getStart(); + end = start + split.getLength(); + toRead = end - start; + + FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file); + + if (start != 0) { + strm.seek(start); + } + + rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + boolean retval = false; + boolean keepGoing = false; + do { + keepGoing = false; + String record = rdr.getJsonRecord(); + if (record != null) { + if (JsonInputFormat.decodeLineToJsonNode(record) == null) { + keepGoing = true; + } else { + outJson.set(record); + outKey.set(rdr.getBytesRead()); + retval = true; + } + } + } while (keepGoing); + + return retval; + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return (float) rdr.getBytesRead() / toRead; + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638059 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638053 --- Diff: processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.jsoninput; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package + * The JsonInputFormat will read two types of JSON formatted data. The default + * expectation is each JSON record is newline delimited. This method is + * generally faster and is backed by the {@link LineRecordReader} you are likely + * familiar with. The other method is 'pretty print' of JSON records, where + * records span multiple lines and often have some type of root identifier. This + * method is likely slower, but respects record boundaries much like the + * LineRecordReader.<br> + * <br> + * Use of the 'pretty print' reader requires a record identifier. + */ +public class JsonInputFormat extends FileInputFormat<LongWritable, Text> { + + private static JsonFactory factory = new JsonFactory(); + + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + RecordReader<LongWritable, Text> rdr; + if (context.getConfiguration().getBoolean(ONE_RECORD_PER_LINE, false)) { + rdr = new SimpleJsonRecordReader(); + } else { + return new JsonRecordReader(); + } + rdr.initialize(split, context); + return rdr; + } + + /** + * This class uses the {@link LineRecordReader} to read a line of JSON and + * return it as a Text object. + */ + public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> { + + private LineRecordReader rdr = null; + + private LongWritable outkey = new LongWritable(0L); + + private Text outvalue = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + rdr = new LineRecordReader(); + rdr.initialize(split, context); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (rdr.nextKeyValue()) { + outvalue.set(rdr.getCurrentValue()); + outkey.set(rdr.getCurrentKey().get()); + return true; + } else { + return false; + } + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return rdr.getProgress(); + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outkey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outvalue; + } + } + + /** + * This class uses the {@link JsonStreamReader} to read JSON records from a + * file. It respects split boundaries to complete full JSON records, as + * specified by the root identifier. This class will discard any records + * that it was unable to decode using + * {@link JsonInputFormat#decodeLineToJsonNode(String)} + */ + public static class JsonRecordReader extends RecordReader<LongWritable, Text> { + + private Logger LOG = Logger.getLogger(JsonRecordReader.class); + + private JsonStreamReader rdr = null; + + private long start = 0, end = 0; + + private float toRead = 0; + + private String identifier = null; + + private Logger log = Logger.getLogger(JsonRecordReader.class); + + private Text outJson = new Text(); + + private LongWritable outKey = new LongWritable(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + this.identifier = context.getConfiguration().get(RECORD_IDENTIFIER); + + if (this.identifier == null || identifier.isEmpty()) { + throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set."); + } else { + LOG.info("Initializing JsonRecordReader with identifier " + identifier); + } + + FileSplit fSplit = (FileSplit) split; + + // get relevant data + Path file = fSplit.getPath(); + + log.info("File is " + file); + + start = fSplit.getStart(); + end = start + split.getLength(); + toRead = end - start; + + FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file); + + if (start != 0) { + strm.seek(start); + } + + rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + boolean retval = false; + boolean keepGoing = false; + do { + keepGoing = false; + String record = rdr.getJsonRecord(); + if (record != null) { + if (JsonInputFormat.decodeLineToJsonNode(record) == null) { + keepGoing = true; + } else { + outJson.set(record); + outKey.set(rdr.getBytesRead()); + retval = true; + } + } + } while (keepGoing); + + return retval; + } + + @Override public void close() throws IOException { + rdr.close(); + } + + @Override public float getProgress() throws IOException { + return (float) rdr.getBytesRead() / toRead; + } + + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { + return outKey; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return outJson; + } + } + + /** + * Decodes a given string of text to a {@link JsonNode}. + * + * @param line The line of text + * @return The JsonNode or null if a JsonParseException, + * JsonMappingException, or IOException error occurs + */ + public static synchronized JsonNode decodeLineToJsonNode(String line) { + + try { + return mapper.readTree(line); + } catch (JsonParseException e) { + e.printStackTrace(); + return null; + } catch (JsonMappingException e) { + e.printStackTrace(); --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638100 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { + Objects.requireNonNull(object, "Input cannot be null"); + try { + Map<String, Object> jsonNodeMap; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNodeMap = + objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new IOException("Failed to parse Json row string "); + } + // convert json object to carbon object. + Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema); --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638101 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { + Objects.requireNonNull(object, "Input cannot be null"); + try { + Map<String, Object> jsonNodeMap; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNodeMap = + objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new IOException("Failed to parse Json row string "); + } + // convert json object to carbon object. + Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema); + writable.set(writeObjects); + recordWriter.write(NullWritable.get(), writable); + } catch (Exception e) { + close(); + throw new IOException(e); + } + } + + /** + * Takes file or directory path, + * containing array of json rows to write carbondata files. + * + * @param inputFilePath + * @param recordIdentifier + * @throws IOException + */ + public void writeFromJsonFile(String inputFilePath, String recordIdentifier) throws IOException { --- End diff -- removed it. --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2384#discussion_r197638106 --- Diff: store/sdk/src/main/java/org/apache/carbondata/util/JsonCarbonUtil.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.util; + +import java.io.IOException; +import java.math.BigDecimal; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; +import org.apache.carbondata.core.metadata.datatype.ArrayType; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; +import org.apache.carbondata.processing.loading.complexobjects.StructObject; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; + +public class JsonCarbonUtil { + + + public static Object[] jsonToCarbonRecord(Map<String, Object> jsonNodeMap, Schema carbonSchema) --- End diff -- ok. done --- |
In reply to this post by qiuchenjian-2
Github user ajantha-bhat commented on the issue:
https://github.com/apache/carbondata/pull/2384 @ravipesala : please review. comments are addressed. --- |
Free forum by Nabble | Edit this page |