Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8386/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/315/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/155/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8394/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/323/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216122505 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala --- @@ -884,4 +962,220 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000)))) } + test("test logical type decimal through Json") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal through Avro") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal with data having greater precision") { --- End diff -- Please add a test to verify boundary values like negative and large big decimal values (Big precission needs long to store unscaled value). --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216122590 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -198,14 +199,35 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { if (unionField.getType().equals(Schema.Type.NULL)) { continue; } - if (checkFieldValueType(unionField.getType(), fieldValue)) { + // Union may not contain more than one schema with the same type, + // except for the named types record,fixed and enum + // hence check for schema also in case of union of multiple record or enum or fixed type + if (checkFieldValueType(unionField.getType(), fieldValue, unionField)) { values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); break; } j++; } out = new StructObject(values); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + // As binary type is not supported yet,value will be null --- End diff -- We should throw unsupported data type exception for bytes. Don't store null. --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216122672 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala --- @@ -884,4 +962,220 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000)))) } + test("test logical type decimal through Json") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal through Avro") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal with data having greater precision") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("1218").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes) + val exception1 = intercept[Exception] { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + } + assert(exception1.getMessage + .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema")) + } + + test("test union with multiple record type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "test.avro", + | "type": "record", + | "name": "NewCar2", + | "fields": [ + | { + | "name": "optionalExtra", + | "type": ["null",{ + | "type":"record", + | "name":"Stereo", + | "fields" :[{ + | "name":"make", + | "type":"string" + | }, + | { + | "name":"speakers", + | "type":"int" + | }] + | },{ + | "type":"record", + | "name":"LeatherTrim", + | "fields":[{ + | "name":"colour", + | "type":"string" + | }] + | }], + | "default":null + | }] + | + |}""".stripMargin + + val json1 = + """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null,null),Row("ab"))))) + } + + test("test union with multiple Enum type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "test.avro", + | "type": "record", + | "name": "Union_data3", + | "fields": [ + | { + | "name": "emp_id", --- End diff -- Please use related names for field. emp_id and union types are not related --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216122724 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -218,9 +240,10 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { * * @param type * @param fieldValue + * @param unionField * @return */ - private boolean checkFieldValueType(Schema.Type type, Object fieldValue) { + private boolean checkFieldValueType(Schema.Type type, Object fieldValue, Schema unionField) { --- End diff -- Rename method to validateUnionFieldValue() --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216123042 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala --- @@ -728,14 +748,46 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef |]} """.stripMargin - val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """ - val nn = new org.apache.avro.Schema.Parser().parse(schema1) + + val logicalType = LogicalTypes.decimal(4, 2) + val decimal1 = new java.math.BigDecimal("32").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal1, nn.getField("address").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = s""" {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"$data"}} """ val record = testUtil.jsonToAvro(json1, schema1) + val jsonData = new String(record.get(2).asInstanceOf[GenericData.Record].get(1) + .asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytesValue = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData)) + val mySchema = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "street", "type": "string"}, + | { "name": "city", "type": {"type" : "bytes", --- End diff -- city and decimal is not correct example --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216123279 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -525,6 +559,11 @@ private static Field prepareFields(Schema.Field avroField) { int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return new Field(fieldName, DataTypes.createDecimalType(precision, scale)); + } else if (logicalType == null && childSchema.getObjectProp("logicalType") --- End diff -- 1) Why this specific check is required? If this case is for invalid schema, AVRO should validate and throw error. If avro consideres this case just like Bytes, we can also consider same. carbon needs not have this specific check. 2) In the else case we should throw exception or leave to default case to throw bytes is not supported exception. --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216123287 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -621,6 +660,11 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return new StructField(fieldName, DataTypes.createDecimalType(precision, scale)); + } else if (logicalType == null && childSchema.getObjectProp("logicalType") --- End diff -- Consider the previous comment in this case also. --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216123298 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -714,6 +758,11 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision(); int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale(); return DataTypes.createDecimalType(precision, scale); + } else if (logicalType == null && childSchema.getObjectProp("logicalType") --- End diff -- Consider the previous comment for decimal here also. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/175/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8414/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/343/ --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216127707 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -198,14 +199,35 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { if (unionField.getType().equals(Schema.Type.NULL)) { continue; } - if (checkFieldValueType(unionField.getType(), fieldValue)) { + // Union may not contain more than one schema with the same type, + // except for the named types record,fixed and enum + // hence check for schema also in case of union of multiple record or enum or fixed type + if (validateUnionFieldValue(unionField.getType(), fieldValue, unionField)) { values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField); break; } j++; } out = new StructObject(values); break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + // As binary type is not supported yet,value will be null --- End diff -- Please remove comment. Its misleading. --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216127832 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java --- @@ -407,6 +423,24 @@ private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue out = null; } break; + case BYTES: + // DECIMAL type is defined in Avro as a BYTE type with the logicalType property + // set to "decimal" and a specified precision and scale + // As binary type is not supported yet,value will be null + if (logicalType instanceof LogicalTypes.Decimal) { + BigDecimal dataValue = new BigDecimal(new BigInteger(((ByteBuffer) fieldValue).array()), --- End diff -- Please extract logic of constructing the BigDecimal value and validation to a separate method and reuse in both places --- |
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2687#discussion_r216128016 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala --- @@ -884,4 +962,310 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000)))) } + test("test logical type decimal through Json") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"id":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("id", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal through Json with big decimal value") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 30, + | "scale": 10 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(30, 10) + val decimal = new java.math.BigDecimal("12672346879023.845789").setScale(10) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12672346879023.8457890000") + } + + test("test logical type decimal through Json with negative decimal value") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 30, + | "scale": 6 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(30, 6) + val decimal = new java.math.BigDecimal("-12672346879023.845").setScale(6) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val record = testUtil.jsonToAvro(json1, schema1) + val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1)) + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes1) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "-12672346879023.845000") + } + + test("test logical type decimal through Avro") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("12.8").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes) + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkExistence(sql("select * from sdkOutputTable"), true, "12.80") + } + + test("test logical type decimal with data having greater precision than specified precision") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "dec_field", + | "type": {"type" : "bytes", + | "logicalType": "decimal", + | "precision": 5, + | "scale": 2 + | } + |} + | ] + |}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val logicalType = LogicalTypes.decimal(5, 2) + val decimal = new java.math.BigDecimal("1218").setScale(2) + //get unscaled 2's complement bytearray + val bytes = + decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType) + val data = DatatypeConverter.printBase64Binary(bytes.array()) + val json1 = + s"""{"dec_field":"$data"}""".stripMargin + val avroRec = new GenericData. Record(nn) + avroRec.put("dec_field", bytes) + val exception1 = intercept[Exception] { + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(avroRec) + writer.close() + } + assert(exception1.getMessage + .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema")) + } + + test("test union with multiple record type") { + sql("drop table if exists sdkOutputTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath)) + val schema1 = + """{ + | "namespace": "test.avro", + | "type": "record", + | "name": "NewCar2", + | "fields": [ + | { + | "name": "optionalExtra", + | "type": ["null",{ + | "type":"record", + | "name":"Stereo", + | "fields" :[{ + | "name":"make", + | "type":"string" + | }, + | { + | "name":"speakers", + | "type":"int" + | }] + | },{ + | "type":"record", + | "name":"LeatherTrim", + | "fields":[{ + | "name":"colour", + | "type":"string" + | }] + | }], + | "default":null + | }] + | + |}""".stripMargin + + val json1 = + """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val record = testUtil.jsonToAvro(json1, schema1) + + val writer = CarbonWriter.builder + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn) + writer.write(record) + writer.close() + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), + Seq(Row(Row(Row(null,null),Row("ab"))))) + } + + test("test union with multiple Enum type") { --- End diff -- Can you add test case to read using data source file format. (Syntax "using carbon" with schema Refer SparkCarbonDataSourceTest ). This will help users on how to define schema for avro logcial types & union . --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2687 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/178/ --- |
Free forum by Nabble | Edit this page |