xubo245 commented on a change in pull request #3819: URL: https://github.com/apache/carbondata/pull/3819#discussion_r464179310 ########## File path: sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ########## @@ -594,6 +614,428 @@ public CarbonWriterBuilder withJsonInput(Schema carbonSchema) { return this; } + private void validateCsvFiles() throws IOException { + File[] dataFiles = this.extractDataFiles(); + for (File dataFile : dataFiles) { + try { + CsvParser csvParser = SDKUtil.buildCsvParser(); + csvParser.beginParsing(dataFile); + } catch (IllegalArgumentException ex) { + if (ex.getCause() instanceof FileNotFoundException) { + throw new FileNotFoundException("File " + dataFile + + " not found to build carbon writer."); + } + throw ex; + } + } + this.dataFiles = dataFiles; + } + + /** + * to build a {@link CarbonWriter}, which accepts loading CSV files. + * + * @param filePath absolute path under which files should be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withCsvPath(String filePath) throws IOException { + if (filePath.length() == 0) { + throw new IllegalArgumentException("filePath can not be empty"); + } + this.filePath = filePath; + this.isDirectory = new File(filePath).isDirectory(); + this.withCsvInput(); + this.validateCsvFiles(); + return this; + } + + /** + * to build a {@link CarbonWriter}, which accepts CSV files directory and + * list of file which has to be loaded. + * + * @param filePath directory where the CSV file exists. + * @param fileList list of files which has to be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withCsvPath(String filePath, List<String> fileList) + throws IOException { + this.fileList = fileList; + this.withCsvPath(filePath); + return this; + } + + private void validateJsonFiles() throws IOException { + File[] dataFiles = this.extractDataFiles(); + for (File dataFile : dataFiles) { + try { + new JSONParser().parse(SDKUtil.buildJsonReader(dataFile)); + } catch (FileNotFoundException ex) { + throw new FileNotFoundException("File " + dataFile + " not found to build carbon writer."); + } catch (ParseException ex) { + throw new RuntimeException("File " + dataFile + " is not in json format."); + } + } + this.dataFiles = dataFiles; + } + + /** + * to build a {@link CarbonWriter}, which accepts loading JSON files. + * + * @param filePath absolute path under which files should be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withJsonPath(String filePath) throws IOException { + if (filePath.length() == 0) { + throw new IllegalArgumentException("filePath can not be empty"); + } + this.filePath = filePath; + this.isDirectory = new File(filePath).isDirectory(); + this.withJsonInput(); + this.validateJsonFiles(); + return this; + } + + /** + * to build a {@link CarbonWriter}, which accepts JSON file directory and + * list of file which has to be loaded. + * + * @param filePath directory where the json file exists. + * @param fileList list of files which has to be loaded. + * @return CarbonWriterBuilder + * @throws IOException + */ + public CarbonWriterBuilder withJsonPath(String filePath, List<String> fileList) + throws IOException { + this.fileList = fileList; + this.withJsonPath(filePath); + return this; + } + + /** + * to build a {@link CarbonWriter}, which accepts loading Parquet files. + * + * @param filePath absolute path under which files should be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withParquetPath(String filePath) throws IOException { + if (filePath.length() == 0) { + throw new IllegalArgumentException("filePath can not be empty"); + } + this.filePath = filePath; + this.isDirectory = new File(filePath).isDirectory(); + this.writerType = WRITER_TYPE.PARQUET; + this.validateParquetFiles(); + return this; + } + + /** + * to build a {@link CarbonWriter}, which accepts parquet files directory and + * list of file which has to be loaded. + * + * @param filePath directory where the parquet file exists. + * @param fileList list of files which has to be loaded. + * @return CarbonWriterBuilder + * @throws IOException + */ + public CarbonWriterBuilder withParquetPath(String filePath, List<String> fileList) + throws IOException { + this.fileList = fileList; + this.withParquetPath(filePath); + return this; + } + + private void validateParquetFiles() throws IOException { + File[] dataFiles = this.extractDataFiles(); + org.apache.avro.Schema parquetSchema = null; + for (File dataFile : dataFiles) { + try { + ParquetReader<GenericRecord> parquetReader = + SDKUtil.buildPqrquetReader(String.valueOf(dataFile)); + if (parquetSchema == null) { + parquetSchema = parquetReader.read().getSchema(); + } else { + if (!parquetSchema.equals(parquetReader.read().getSchema())) { + throw new RuntimeException("All the parquet files must be having the same schema."); + } + } + } catch (IllegalArgumentException ex) { + if (ex.getMessage().contains("INT96 not yet implemented")) { + throw new IllegalArgumentException("Carbon does not support parquet INT96 data type."); + } + throw ex; + } catch (UnsupportedOperationException ex) { + if (ex.getMessage().contains("REPEATED not supported outside LIST or MAP.")) { + throw new UnsupportedOperationException("Carbon does not support " + + "repeated parquet schema outside of list or map."); + } + throw ex; + } catch (RuntimeException ex) { + if (ex.getMessage().contains("not a Parquet file")) { + throw new RuntimeException("File " + dataFile + " is not in parquet format."); + } + throw ex; + } + } + this.dataFiles = dataFiles; + this.avroSchema = parquetSchema; + this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(this.avroSchema); + } + + private File[] extractDataFiles() { + File[] dataFiles; + if (this.isDirectory) { + if (this.fileList == null || this.fileList.size() == 0) { + dataFiles = SDKUtil.extractFilesFromFolder(this.filePath); + } else { + dataFiles = this.appendFileListWithPath(); + } + } else { + dataFiles = new File[]{new File(this.filePath)}; + } + return dataFiles; + } + + /** + * to build a {@link CarbonWriter}, which accepts loading ORC files. + * + * @param filePath absolute path under which files should be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withOrcPath(String filePath) throws IOException { + if (filePath.length() == 0) { + throw new IllegalArgumentException("filePath can not be empty"); + } + this.filePath = filePath; + this.isDirectory = new File(filePath).isDirectory(); + this.writerType = WRITER_TYPE.ORC; + Map<String, String> options = new HashMap<>(); + options.put("complex_delimiter_level_1", "#"); + options.put("complex_delimiter_level_2", "$"); + options.put("complex_delimiter_level_3", "@"); + this.withLoadOptions(options); + this.buildOrcReader(); + return this; + } + + /** + * to build a {@link CarbonWriter}, which accepts orc files directory and + * list of file which has to be loaded. + * + * @param filePath directory where the orc file exists. + * @param fileList list of files which has to be loaded. + * @return CarbonWriterBuilder + * @throws IOException + */ + public CarbonWriterBuilder withOrcPath(String filePath, List<String> fileList) + throws IOException { + this.fileList = fileList; + this.withOrcPath(filePath); + return this; + } + + private void compareAllOrcFilesSchema(File[] dataFiles) throws IOException { + TypeDescription orcSchema = null; + for (File dataFile : dataFiles) { + Reader orcReader = SDKUtil.buildOrcReader(String.valueOf(dataFile)); + if (orcSchema == null) { + orcSchema = orcReader.getSchema(); + } else { + if (!orcSchema.toString().equals(orcReader.getSchema().toString())) { + throw new RuntimeException("All the ORC files must be having the same schema."); + } + } + } + this.dataFiles = dataFiles; + } + + private File[] appendFileListWithPath() { + File[] dataFiles = new File[this.fileList.size()]; + for (int i = 0; i < this.fileList.size(); i++) { + dataFiles[i] = new File(this.filePath + "/" + this.fileList.get(i)); + } + return dataFiles; + } + + // build orc reader and convert orc schema to carbon schema. + private void buildOrcReader() throws IOException { + Reader orcReader; + File[] dataFiles = this.extractDataFiles(); + this.compareAllOrcFilesSchema(dataFiles); + orcReader = SDKUtil.buildOrcReader(String.valueOf(dataFiles[0])); + TypeDescription typeDescription = orcReader.getSchema(); + List<String> fieldList; + try { + fieldList = typeDescription.getFieldNames(); + } catch (NullPointerException e) { + throw new RuntimeException("Schema can not be null of ORC file."); + } + Field field = orcToCarbonSchemaConverter(typeDescription, + fieldList, typeDescription.getCategory().getName()); + String fieldType = field.getDataType().toString(); + if (fieldType.equalsIgnoreCase("struct")) { + int size = field.getChildren().size(); + Field[] fields = new Field[size]; + for (int i = 0; i < size; i++) { + StructField columnDetails = field.getChildren().get(i); + fields[i] = new Field(columnDetails.getFieldName(), + columnDetails.getDataType(), columnDetails.getChildren()); + } + this.schema = new Schema(fields); + } else { + Field[] fields = new Field[1]; + fields[0] = field; + this.schema = new Schema(fields); + } + } + + // TO convert ORC schema to carbon schema + private Field orcToCarbonSchemaConverter(TypeDescription typeDescription, + List<String> fieldsName, String colName) { + Objects.requireNonNull(typeDescription, "orc typeDescription should not be null"); + Objects.requireNonNull(typeDescription.getCategory(), + "typeDescription category should not be null"); + if (colName == null) { + colName = typeDescription.getCategory().getName(); + } + switch (typeDescription.getCategory()) { + case BOOLEAN: + return new Field(colName, "boolean"); + case BYTE: + case BINARY: + return new Field(colName, "binary"); + case SHORT: + return new Field(colName, "short"); + case INT: + return new Field(colName, "int"); + case LONG: + return new Field(colName, "long"); + case FLOAT: + return new Field(colName, "float"); + case DOUBLE: + return new Field(colName, "double"); + case DECIMAL: + return new Field(colName, "decimal"); + case STRING: + return new Field(colName, "string"); + case CHAR: + case VARCHAR: + return new Field(colName, "varchar"); + case DATE: + return new Field(colName, "date"); + case TIMESTAMP: + return new Field(colName, "timestamp"); + case STRUCT: + List<TypeDescription> childSchemas = typeDescription.getChildren(); + Field[] childs = new Field[childSchemas.size()]; + childSchema(childs, childSchemas, fieldsName); + List<StructField> structList = new ArrayList<>(); + for (int i = 0; i < childSchemas.size(); i++) { + structList.add(new StructField(childs[i].getFieldName(), + childs[i].getDataType(), childs[i].getChildren())); + } + return new Field(colName, "struct", structList); + case LIST: + childSchemas = typeDescription.getChildren(); + childs = new Field[childSchemas.size()]; + childSchema(childs, childSchemas, fieldsName); + List<StructField> arrayField = new ArrayList<>(); + for (int i = 0; i < childSchemas.size(); i++) { + arrayField.add(new StructField(childs[i].getFieldName(), + childs[i].getDataType(), childs[i].getChildren())); + } + return new Field(colName, "array", arrayField); + case MAP: + childSchemas = typeDescription.getChildren(); + childs = new Field[childSchemas.size()]; + childSchema(childs, childSchemas, fieldsName); + ArrayList<StructField> keyValueFields = new ArrayList<>(); + StructField keyField = new StructField(typeDescription.getCategory().getName() + ".key", + childs[0].getDataType()); + StructField valueField = new StructField(typeDescription.getCategory().getName() + ".value", + childs[1].getDataType(), childs[1].getChildren()); + keyValueFields.add(keyField); + keyValueFields.add(valueField); + StructField mapKeyValueField = + new StructField(typeDescription.getCategory().getName() + ".val", + DataTypes.createStructType(keyValueFields), keyValueFields); + MapType mapType = + DataTypes.createMapType(DataTypes.STRING, mapKeyValueField.getDataType()); + List<StructField> mapStructFields = new ArrayList<>(); + mapStructFields.add(mapKeyValueField); + return new Field(colName, mapType, mapStructFields); + default: + throw new UnsupportedOperationException( + "carbon not support " + typeDescription.getCategory().getName() + " orc type yet"); + } + } + + // extract child schema from the ORC type description. + private Field[] childSchema(Field[] childs, + List<TypeDescription> childSchemas, List<String> fieldsName) { + for (int i = 0; i < childSchemas.size(); i++) { + List<String> fieldList = null; + try { + fieldList = childSchemas.get(i).getFieldNames(); + } catch (NullPointerException e) { + LOGGER.info("quad tree disJoint with query polygon envelope return"); + } + childs[i] = orcToCarbonSchemaConverter(childSchemas.get(i), fieldList, + fieldsName == null ? null : fieldsName.get(i)); + } + return childs; + } + + /** + * to build a {@link CarbonWriter}, which accepts loading AVRO files. + * + * @param filePath absolute path under which files should be loaded. + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withAvroPath(String filePath) throws IOException { + if (filePath.length() == 0) { Review comment: please check all in this PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |