[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

classic Classic list List threaded Threaded
46 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8386/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/315/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/155/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8394/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/323/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216122505
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---
    @@ -884,4 +962,220 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000))))
       }
     
    +  test("test logical type decimal through Json") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes1)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal through Avro") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal with data having greater precision") {
    --- End diff --
   
    Please add a test to verify boundary values like negative and large big decimal values (Big precission needs long to store unscaled value).


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216122590
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -198,14 +199,35 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
               if (unionField.getType().equals(Schema.Type.NULL)) {
                 continue;
               }
    -          if (checkFieldValueType(unionField.getType(), fieldValue)) {
    +          // Union may not contain more than one schema with the same type,
    +          // except for the named types record,fixed and enum
    +          // hence check for schema also in case of union of multiple record or enum or fixed type
    +          if (checkFieldValueType(unionField.getType(), fieldValue, unionField)) {
                 values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField);
                 break;
               }
               j++;
             }
             out = new StructObject(values);
             break;
    +      case BYTES:
    +        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
    +        // set to "decimal" and a specified precision and scale
    +        // As binary type is not supported yet,value will be null
    --- End diff --
   
    We should throw unsupported data type exception for bytes. Don't store null.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216122672
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---
    @@ -884,4 +962,220 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000))))
       }
     
    +  test("test logical type decimal through Json") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes1)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal through Avro") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal with data having greater precision") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("1218").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes)
    +    val exception1 = intercept[Exception] {
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    }
    +    assert(exception1.getMessage
    +      .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema"))
    +  }
    +
    +  test("test union with multiple record type") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "test.avro",
    +        | "type": "record",
    +        | "name": "NewCar2",
    +        | "fields": [
    +        |      {
    +        |  "name": "optionalExtra",
    +        |    "type": ["null",{
    +        |       "type":"record",
    +        |       "name":"Stereo",
    +        |       "fields" :[{
    +        |       "name":"make",
    +        |       "type":"string"
    +        |       },
    +        |       {
    +        |       "name":"speakers",
    +        |       "type":"int"
    +        |       }]
    +        |       },{
    +        |       "type":"record",
    +        |       "name":"LeatherTrim",
    +        |       "fields":[{
    +        |       "name":"colour",
    +        |       "type":"string"
    +        |       }]
    +        |       }],
    +        |       "default":null
    +        |       }]
    +        |
    +        |}""".stripMargin
    +
    +    val json1 =
    +      """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin
    +
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(record)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkAnswer(sql("select * from sdkOutputTable"),
    +      Seq(Row(Row(Row(null,null),Row("ab")))))
    +  }
    +
    +  test("test union with multiple Enum type") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "test.avro",
    +        | "type": "record",
    +        | "name": "Union_data3",
    +        | "fields": [
    +        |      {
    +        |  "name": "emp_id",
    --- End diff --
   
    Please use related names for field. emp_id and union types are not related


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216122724
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -218,9 +240,10 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
        *
        * @param type
        * @param fieldValue
    +   * @param unionField
        * @return
        */
    -  private boolean checkFieldValueType(Schema.Type type, Object fieldValue) {
    +  private boolean checkFieldValueType(Schema.Type type, Object fieldValue, Schema unionField) {
    --- End diff --
   
    Rename method to validateUnionFieldValue()


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216123042
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---
    @@ -728,14 +748,46 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
             |]}
           """.stripMargin
     
    -    val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"32"}} """
    -
         val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +
    +    val logicalType = LogicalTypes.decimal(4, 2)
    +    val decimal1 = new java.math.BigDecimal("32").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal1, nn.getField("address").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 = s""" {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"$data"}} """
         val record = testUtil.jsonToAvro(json1, schema1)
     
    +    val jsonData = new String(record.get(2).asInstanceOf[GenericData.Record].get(1)
    +      .asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytesValue = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(jsonData))
    +    val mySchema =
    +      """
    +        |{"name": "address",
    +        | "type": "record",
    +        | "fields": [
    +        |  { "name": "street", "type": "string"},
    +        |  { "name": "city", "type": {"type" : "bytes",
    --- End diff --
   
    city and decimal is not correct example


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216123279
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -525,6 +559,11 @@ private static Field prepareFields(Schema.Field avroField) {
               int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
               int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
               return new Field(fieldName, DataTypes.createDecimalType(precision, scale));
    +        } else if (logicalType == null && childSchema.getObjectProp("logicalType")
    --- End diff --
   
    1) Why this specific check is required?
    If this case is for invalid schema, AVRO should validate and throw error. If avro consideres this case just like Bytes, we can also consider same. carbon needs not have this specific check.
   
    2) In the else case we should throw exception or leave to default case to throw bytes is not supported exception.



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216123287
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -621,6 +660,11 @@ private static StructField prepareSubFields(String fieldName, Schema childSchema
               int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
               int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
               return new StructField(fieldName, DataTypes.createDecimalType(precision, scale));
    +        } else if (logicalType == null && childSchema.getObjectProp("logicalType")
    --- End diff --
   
    Consider the previous comment in this case also.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216123298
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -714,6 +758,11 @@ private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema
               int precision = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getPrecision();
               int scale = ((LogicalTypes.Decimal) childSchema.getLogicalType()).getScale();
               return DataTypes.createDecimalType(precision, scale);
    +        } else if (logicalType == null && childSchema.getObjectProp("logicalType")
    --- End diff --
   
    Consider the previous comment for decimal here also.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/175/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8414/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/343/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216127707
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -198,14 +199,35 @@ private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
               if (unionField.getType().equals(Schema.Type.NULL)) {
                 continue;
               }
    -          if (checkFieldValueType(unionField.getType(), fieldValue)) {
    +          // Union may not contain more than one schema with the same type,
    +          // except for the named types record,fixed and enum
    +          // hence check for schema also in case of union of multiple record or enum or fixed type
    +          if (validateUnionFieldValue(unionField.getType(), fieldValue, unionField)) {
                 values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, avroField);
                 break;
               }
               j++;
             }
             out = new StructObject(values);
             break;
    +      case BYTES:
    +        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
    +        // set to "decimal" and a specified precision and scale
    +        // As binary type is not supported yet,value will be null
    --- End diff --
   
    Please remove comment. Its misleading.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216127832
 
    --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---
    @@ -407,6 +423,24 @@ private Object avroFieldToObjectForUnionType(Schema avroField, Object fieldValue
               out = null;
             }
             break;
    +      case BYTES:
    +        // DECIMAL type is defined in Avro as a BYTE type with the logicalType property
    +        // set to "decimal" and a specified precision and scale
    +        // As binary type is not supported yet,value will be null
    +        if (logicalType instanceof LogicalTypes.Decimal) {
    +          BigDecimal dataValue = new BigDecimal(new BigInteger(((ByteBuffer) fieldValue).array()),
    --- End diff --
   
    Please extract logic of constructing the BigDecimal value and validation to a separate method and reuse in both places


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2687: [CARBONDATA-2876]Fix Avro decimal datatype wi...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2687#discussion_r216128016
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala ---
    @@ -884,4 +962,310 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
         checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(1728000, Row(1728000))))
       }
     
    +  test("test logical type decimal through Json") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "id",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("id").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"id":"$data"}""".stripMargin
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("id", bytes1)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal through Json with big decimal value") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "dec_field",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 30,
    +        |                     "scale": 10
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(30, 10)
    +    val decimal = new java.math.BigDecimal("12672346879023.845789").setScale(10)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"dec_field":"$data"}""".stripMargin
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("dec_field", bytes1)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12672346879023.8457890000")
    +  }
    +
    +  test("test logical type decimal through Json with negative decimal value") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "dec_field",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 30,
    +        |                     "scale": 6
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(30, 6)
    +    val decimal = new java.math.BigDecimal("-12672346879023.845").setScale(6)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"dec_field":"$data"}""".stripMargin
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +    val data1 = new String(record.get(0).asInstanceOf[ByteBuffer].array(),
    +      CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
    +    val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("dec_field", bytes1)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "-12672346879023.845000")
    +  }
    +
    +  test("test logical type decimal through Avro") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "dec_field",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("12.8").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"dec_field":"$data"}""".stripMargin
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("dec_field", bytes)
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkExistence(sql("select * from sdkOutputTable"), true, "12.80")
    +  }
    +
    +  test("test logical type decimal with data having greater precision than specified precision") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "com.apache.schema",
    +        | "type": "record",
    +        | "name": "StudentActivity",
    +        | "fields": [
    +        | {
    +        | "name": "dec_field",
    +        | "type": {"type" : "bytes",
    +        |                     "logicalType": "decimal",
    +        |                     "precision": 5,
    +        |                     "scale": 2
    +        |                    }
    +        |}
    +        | ]
    +        |}""".stripMargin
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val logicalType = LogicalTypes.decimal(5, 2)
    +    val decimal = new java.math.BigDecimal("1218").setScale(2)
    +    //get unscaled 2's complement bytearray
    +    val bytes =
    +      decimalConversion.toBytes(decimal, nn.getField("dec_field").schema, logicalType)
    +    val data = DatatypeConverter.printBase64Binary(bytes.array())
    +    val json1 =
    +      s"""{"dec_field":"$data"}""".stripMargin
    +    val avroRec = new GenericData. Record(nn)
    +    avroRec.put("dec_field", bytes)
    +    val exception1 = intercept[Exception] {
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(avroRec)
    +    writer.close()
    +    }
    +    assert(exception1.getMessage
    +      .contains("Data Loading failed as value Precision 6 is greater than specified Precision 5 in Avro Schema"))
    +  }
    +
    +  test("test union with multiple record type") {
    +    sql("drop table if exists sdkOutputTable")
    +    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
    +    val schema1 =
    +      """{
    +        | "namespace": "test.avro",
    +        | "type": "record",
    +        | "name": "NewCar2",
    +        | "fields": [
    +        |      {
    +        |  "name": "optionalExtra",
    +        |    "type": ["null",{
    +        |       "type":"record",
    +        |       "name":"Stereo",
    +        |       "fields" :[{
    +        |       "name":"make",
    +        |       "type":"string"
    +        |       },
    +        |       {
    +        |       "name":"speakers",
    +        |       "type":"int"
    +        |       }]
    +        |       },{
    +        |       "type":"record",
    +        |       "name":"LeatherTrim",
    +        |       "fields":[{
    +        |       "name":"colour",
    +        |       "type":"string"
    +        |       }]
    +        |       }],
    +        |       "default":null
    +        |       }]
    +        |
    +        |}""".stripMargin
    +
    +    val json1 =
    +      """{"optionalExtra":{"test.avro.LeatherTrim":{"colour":"ab"}}}""".stripMargin
    +
    +    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
    +    val record = testUtil.jsonToAvro(json1, schema1)
    +
    +    val writer = CarbonWriter.builder
    +      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
    +    writer.write(record)
    +    writer.close()
    +    sql(
    +      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
    +         |'carbondata' LOCATION
    +         |'$writerPath' """.stripMargin)
    +    checkAnswer(sql("select * from sdkOutputTable"),
    +      Seq(Row(Row(Row(null,null),Row("ab")))))
    +  }
    +
    +  test("test union with multiple Enum type") {
    --- End diff --
   
    Can you add test case to read using data source file format. (Syntax "using carbon" with schema Refer SparkCarbonDataSourceTest ).
    This will help users on how to define schema for avro logcial types & union .


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2687: [CARBONDATA-2876]Fix Avro decimal datatype with prec...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2687
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/178/



---
123