[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3819: [CARBONDATA-3855]support carbon SDK to load data from different files

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3819: [CARBONDATA-3855]support carbon SDK to load data from different files

GitBox

Indhumathi27 commented on a change in pull request #3819:
URL: https://github.com/apache/carbondata/pull/3819#discussion_r448811521



##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ParquetCarbonWriter.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Implementation to write parquet rows in avro format to carbondata file.
+ */
+public class ParquetCarbonWriter extends AvroCarbonWriter {
+  private AvroCarbonWriter avroCarbonWriter = null;
+  private String filePath = "";
+  private boolean isDirectory = false;
+  private List<String> fileList;
+
+  ParquetCarbonWriter(AvroCarbonWriter avroCarbonWriter) {
+    this.avroCarbonWriter = avroCarbonWriter;
+  }
+
+  @Override
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  @Override
+  public void setIsDirectory(boolean isDirectory) {
+    this.isDirectory = isDirectory;
+  }
+
+  @Override
+  public void setFileList(List<String> fileList) {
+    this.fileList = fileList;
+  }
+
+  /**
+   * Load data of all parquet files at given location iteratively.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void write() throws IOException {
+    if (this.filePath.length() == 0) {
+      throw new RuntimeException("'withParquetPath()' " +
+          "must be called to support load parquet files");
+    }
+    if (this.avroCarbonWriter == null) {
+      throw new RuntimeException("avro carbon writer can not be null");
+    }
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();
+        if (dataFiles == null || dataFiles.length == 0) {
+          throw new RuntimeException("No Parquet file found at given location. Please provide " +
+              "the correct folder location.");
+        }
+        Arrays.sort(dataFiles);
+        for (File dataFile : dataFiles) {
+          this.loadSingleFile(dataFile);
+        }
+      } else {
+        for (String file : this.fileList) {
+          this.loadSingleFile(new File(this.filePath + "/" + file));
+        }
+      }
+    } else {
+      this.loadSingleFile(new File(this.filePath));
+    }
+  }
+
+  private void loadSingleFile(File file) throws IOException {
+    AvroReadSupport<GenericRecord> avroReadSupport = new AvroReadSupport<>();
+    ParquetReader<GenericRecord> parquetReader = ParquetReader.builder(avroReadSupport,
+        new Path(String.valueOf(file))).withConf(new Configuration()).build();
+    GenericRecord genericRecord = null;
+    while ((genericRecord = parquetReader.read()) != null) {
+      System.out.println(genericRecord);

Review comment:
       remove this line

##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ORCCarbonWriter.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Implementation to write ORC rows in CSV format to carbondata file.
+ */
+public class ORCCarbonWriter extends CSVCarbonWriter {
+  private CSVCarbonWriter csvCarbonWriter = null;
+  private String filePath = "";
+  private Reader orcReader = null;
+  private boolean isDirectory = false;
+  private List<String> fileList;
+
+  ORCCarbonWriter(CSVCarbonWriter csvCarbonWriter) {
+    this.csvCarbonWriter = csvCarbonWriter;
+  }
+
+  @Override
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  @Override
+  public void setIsDirectory(boolean isDirectory) {
+    this.isDirectory = isDirectory;
+  }
+
+  @Override
+  public void setFileList(List<String> fileList) {
+    this.fileList = fileList;
+  }
+
+  /**
+   * Load ORC file in iterative way.
+   */
+  @Override
+  public void write() throws IOException {
+    if (this.filePath.length() == 0) {
+      throw new RuntimeException("'withOrcPath()' must be called to support load ORC files");
+    }
+    if (this.csvCarbonWriter == null) {
+      throw new RuntimeException("csv carbon writer can not be null");
+    }
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();
+        if (dataFiles == null || dataFiles.length == 0) {
+          throw new RuntimeException("No ORC file found at given location. Please provide " +
+              "the correct folder location.");
+        }
+        for (File dataFile : dataFiles) {
+          this.loadSingleFile(dataFile);
+        }
+      } else {
+        for (String file : this.fileList) {
+          this.loadSingleFile(new File(this.filePath + "/" + file));
+        }
+      }
+    } else {
+      this.loadSingleFile(new File(this.filePath));
+    }
+  }
+
+  private void loadSingleFile(File file) throws IOException {
+    orcReader = OrcFile.createReader(new Path(String.valueOf(file)),
+        OrcFile.readerOptions(new Configuration()));
+    ObjectInspector objectInspector = orcReader.getObjectInspector();
+    RecordReader recordReader = orcReader.rows();
+    if (objectInspector instanceof StructObjectInspector) {
+      StructObjectInspector structObjectInspector =
+          (StructObjectInspector) orcReader.getObjectInspector();
+      while (recordReader.hasNext()) {
+        Object record = recordReader.next(null); // to remove duplicacy.
+        List valueList = structObjectInspector.getStructFieldsDataAsList(record);
+        for (int i = 0; i < valueList.size(); i++) {
+          valueList.set(i, parseOrcObject(valueList.get(i), 0));
+        }
+        this.csvCarbonWriter.write(valueList.toArray());
+      }
+    } else {
+      while (recordReader.hasNext()) {
+        Object record = recordReader.next(null); // to remove duplicacy.
+        this.csvCarbonWriter.write(new Object[]{parseOrcObject(record, 0)});
+      }
+    }
+  }
+
+  private String parseOrcObject(Object obj, int level) {
+    if (obj instanceof OrcStruct) {
+      Objects.requireNonNull(orcReader);
+      StructObjectInspector structObjectInspector = (StructObjectInspector) orcReader
+          .getObjectInspector();
+      List value = structObjectInspector.getStructFieldsDataAsList(obj);
+      for (int i = 0; i < value.size(); i++) {
+        value.set(i, parseOrcObject(value.get(i), level + 1));
+      }
+      String str = listToString(value, level);
+      if (str.length() > 0) {
+        return str.substring(0, str.length() - 1);
+      }
+      return null;
+    } else if (obj instanceof ArrayList) {
+      ArrayList listValue = (ArrayList) obj;
+      for (int i = 0; i < listValue.size(); i++) {
+        listValue.set(i, parseOrcObject(listValue.get(i), level + 1));
+      }
+      String str = listToString(listValue, level);
+      if (str.length() > 0) {
+        return str.substring(0, str.length() - 1);
+      }
+      return null;
+    } else if (obj instanceof LinkedHashMap) {
+      LinkedHashMap<Text, Object> keyValueRow = (LinkedHashMap<Text, Object>) obj;
+      for (Map.Entry<Text, Object> entry : keyValueRow.entrySet()) {
+        Object val = parseOrcObject(keyValueRow.get(entry.getKey()), level + 2);
+        keyValueRow.put(entry.getKey(), val);
+      }
+      StringBuilder str = new StringBuilder();
+      for (Map.Entry<Text, Object> entry : keyValueRow.entrySet()) {
+        Text key = entry.getKey();
+        str.append(key.toString()).append("$").append(keyValueRow.get(key)).append("#");
+      }
+      if (str.length() > 0) {
+        return str.substring(0, str.length() - 1);
+      }
+      return null;
+    }
+    if (obj == null) {
+      return null;
+    }
+    return obj.toString();
+  }
+
+  private String listToString(List value, int level) {
+    String delimeter = "";

Review comment:
       Rename delimeter to delimiter

##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ORCCarbonWriter.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Implementation to write ORC rows in CSV format to carbondata file.
+ */
+public class ORCCarbonWriter extends CSVCarbonWriter {
+  private CSVCarbonWriter csvCarbonWriter = null;
+  private String filePath = "";
+  private Reader orcReader = null;
+  private boolean isDirectory = false;
+  private List<String> fileList;
+
+  ORCCarbonWriter(CSVCarbonWriter csvCarbonWriter) {
+    this.csvCarbonWriter = csvCarbonWriter;
+  }
+
+  @Override
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  @Override
+  public void setIsDirectory(boolean isDirectory) {
+    this.isDirectory = isDirectory;
+  }
+
+  @Override
+  public void setFileList(List<String> fileList) {
+    this.fileList = fileList;
+  }
+
+  /**
+   * Load ORC file in iterative way.
+   */
+  @Override
+  public void write() throws IOException {
+    if (this.filePath.length() == 0) {
+      throw new RuntimeException("'withOrcPath()' must be called to support load ORC files");
+    }
+    if (this.csvCarbonWriter == null) {
+      throw new RuntimeException("csv carbon writer can not be null");
+    }
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();
+        if (dataFiles == null || dataFiles.length == 0) {
+          throw new RuntimeException("No ORC file found at given location. Please provide " +
+              "the correct folder location.");
+        }
+        for (File dataFile : dataFiles) {
+          this.loadSingleFile(dataFile);
+        }
+      } else {
+        for (String file : this.fileList) {
+          this.loadSingleFile(new File(this.filePath + "/" + file));
+        }
+      }
+    } else {
+      this.loadSingleFile(new File(this.filePath));
+    }
+  }
+
+  private void loadSingleFile(File file) throws IOException {
+    orcReader = OrcFile.createReader(new Path(String.valueOf(file)),
+        OrcFile.readerOptions(new Configuration()));
+    ObjectInspector objectInspector = orcReader.getObjectInspector();
+    RecordReader recordReader = orcReader.rows();
+    if (objectInspector instanceof StructObjectInspector) {
+      StructObjectInspector structObjectInspector =
+          (StructObjectInspector) orcReader.getObjectInspector();
+      while (recordReader.hasNext()) {
+        Object record = recordReader.next(null); // to remove duplicacy.
+        List valueList = structObjectInspector.getStructFieldsDataAsList(record);
+        for (int i = 0; i < valueList.size(); i++) {
+          valueList.set(i, parseOrcObject(valueList.get(i), 0));
+        }
+        this.csvCarbonWriter.write(valueList.toArray());
+      }
+    } else {
+      while (recordReader.hasNext()) {
+        Object record = recordReader.next(null); // to remove duplicacy.
+        this.csvCarbonWriter.write(new Object[]{parseOrcObject(record, 0)});
+      }
+    }
+  }
+
+  private String parseOrcObject(Object obj, int level) {

Review comment:
       Rename obj to recordObject

##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
##########
@@ -594,6 +613,332 @@ public CarbonWriterBuilder withJsonInput(Schema carbonSchema) {
     return this;
   }
 
+  /**
+   * to build a {@link CarbonWriter}, which accepts loading CSV files.
+   *
+   * @param filePath absolute path under which files should be loaded.
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withCsvPath(String filePath) {
+    if (filePath.length() == 0) {
+      throw new IllegalArgumentException("filePath can not be empty");
+    }
+    this.filePath = filePath;
+    this.isDirectory = new File(filePath).isDirectory();
+    this.withCsvInput();
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts CSV files directory and
+   * list of file which has to be loaded.
+   *
+   * @param filePath directory where the CSV file exists.
+   * @param fileList list of files which has to be loaded.
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withCsvPath(String filePath, List<String> fileList) {
+    this.fileList = fileList;
+    this.withCsvPath(filePath);
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts loading Parquet files.
+   *
+   * @param filePath absolute path under which files should be loaded.
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withParquetPath(String filePath) throws IOException {
+    if (filePath.length() == 0) {
+      throw new IllegalArgumentException("filePath can not be empty");
+    }
+    this.filePath = filePath;
+    this.isDirectory = new File(filePath).isDirectory();
+    this.writerType = WRITER_TYPE.PARQUET;
+    this.buildParquetReader();
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts parquet files directory and
+   * list of file which has to be loaded.
+   *
+   * @param filePath directory where the parquet file exists.
+   * @param fileList list of files which has to be loaded.
+   * @return CarbonWriterBuilder
+   * @throws IOException
+   */
+  public CarbonWriterBuilder withParquetPath(String filePath, List<String> fileList)
+          throws IOException {
+    this.fileList = fileList;
+    this.withParquetPath(filePath);
+    return this;
+  }
+
+  private void buildParquetReader() throws IOException {
+    AvroReadSupport<GenericRecord> avroReadSupport = new AvroReadSupport<>();
+    ParquetReader<GenericRecord> parquetReader;
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();
+        if (dataFiles == null || dataFiles.length == 0) {
+          throw new RuntimeException("No Parquet file found at given location. Please provide" +
+                  "the correct folder location.");
+        }
+        parquetReader = ParquetReader.builder(avroReadSupport,
+                new Path(String.valueOf(dataFiles[0]))).build();
+      } else {
+        parquetReader = ParquetReader.builder(avroReadSupport,
+                new Path(this.filePath + "/" + this.fileList.get(0))).build();
+      }
+    } else {
+      parquetReader = ParquetReader.builder(avroReadSupport,
+              new Path(this.filePath)).build();
+    }
+    this.avroSchema = parquetReader.read().getSchema();
+    this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(this.avroSchema);
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts loading ORC files.
+   *
+   * @param filePath absolute path under which files should be loaded.
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withOrcPath(String filePath) throws IOException {
+    if (filePath.length() == 0) {
+      throw new IllegalArgumentException("filePath can not be empty");
+    }
+    this.filePath = filePath;
+    this.isDirectory = new File(filePath).isDirectory();
+    this.writerType = WRITER_TYPE.ORC;
+    Map<String, String> options = new HashMap<>();
+    options.put("complex_delimiter_level_1", "#");
+    options.put("complex_delimiter_level_2", "$");
+    options.put("complex_delimiter_level_3", "@");
+    this.withLoadOptions(options);
+    this.buildOrcReader();
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts orc files directory and
+   * list of file which has to be loaded.
+   *
+   * @param filePath directory where the orc file exists.
+   * @param fileList list of files which has to be loaded.
+   * @return CarbonWriterBuilder
+   * @throws IOException
+   */
+  public CarbonWriterBuilder withOrcPath(String filePath, List<String> fileList)
+          throws IOException {
+    this.fileList = fileList;
+    this.withOrcPath(filePath);
+    return this;
+  }
+
+  // build orc reader and convert orc schema to carbon schema.
+  private void buildOrcReader() throws IOException {
+    Reader orcReader;
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();

Review comment:
       Can extract common code to get dataFiles to a method and reuse in orc,csv,parquet,avro

##########
File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/ParquetCarbonWriter.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Implementation to write parquet rows in avro format to carbondata file.
+ */
+public class ParquetCarbonWriter extends AvroCarbonWriter {
+  private AvroCarbonWriter avroCarbonWriter = null;
+  private String filePath = "";
+  private boolean isDirectory = false;
+  private List<String> fileList;
+
+  ParquetCarbonWriter(AvroCarbonWriter avroCarbonWriter) {
+    this.avroCarbonWriter = avroCarbonWriter;
+  }
+
+  @Override
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  @Override
+  public void setIsDirectory(boolean isDirectory) {
+    this.isDirectory = isDirectory;
+  }
+
+  @Override
+  public void setFileList(List<String> fileList) {
+    this.fileList = fileList;
+  }
+
+  /**
+   * Load data of all parquet files at given location iteratively.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void write() throws IOException {
+    if (this.filePath.length() == 0) {
+      throw new RuntimeException("'withParquetPath()' " +
+          "must be called to support load parquet files");
+    }
+    if (this.avroCarbonWriter == null) {
+      throw new RuntimeException("avro carbon writer can not be null");
+    }
+    if (this.isDirectory) {
+      if (this.fileList == null || this.fileList.size() == 0) {
+        File[] dataFiles = new File(this.filePath).listFiles();
+        if (dataFiles == null || dataFiles.length == 0) {
+          throw new RuntimeException("No Parquet file found at given location. Please provide " +
+              "the correct folder location.");
+        }
+        Arrays.sort(dataFiles);
+        for (File dataFile : dataFiles) {
+          this.loadSingleFile(dataFile);
+        }
+      } else {
+        for (String file : this.fileList) {
+          this.loadSingleFile(new File(this.filePath + "/" + file));
+        }
+      }
+    } else {
+      this.loadSingleFile(new File(this.filePath));
+    }
+  }
+
+  private void loadSingleFile(File file) throws IOException {
+    AvroReadSupport<GenericRecord> avroReadSupport = new AvroReadSupport<>();
+    ParquetReader<GenericRecord> parquetReader = ParquetReader.builder(avroReadSupport,
+        new Path(String.valueOf(file))).withConf(new Configuration()).build();
+    GenericRecord genericRecord = null;

Review comment:
       ```suggestion
       GenericRecord genericRecord;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]