The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

xm_zzc
This post was updated on .
Hi, dev:
  I am using Spark 2.1 + CarbonData 1.2, and find that if
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will
load data unsuccessfully.

My test code:
    val longStr = sb.toString()  // the getBytes length of longStr exceeds 32768
    println(longStr.length())
    println(longStr.getBytes("UTF-8").length)
   
    import spark.implicits._
    val df1 = spark.sparkContext.parallelize(0 to 1000)
      .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
      .toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
     
    val df2 = spark.sparkContext.parallelize(1001 to 2000)
      .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
      .toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
     
    val df3 = df1.union(df2)
    val tableName = "study_carbondata_test"
    spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
    val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT
    spark.sql(s"""
        |  CREATE TABLE IF NOT EXISTS ${tableName} (
        |    stringField1          string,
        |    stringField2          string,
        |    stringField3          string,
        |    intField              int,
        |    longField             bigint,
        |    int2Field             int
        |  )
        |  STORED BY 'carbondata'
        |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2',
        |    'SORT_COLUMNS'='stringField1, stringField2, intField,
longField',
        |    'SORT_SCOPE'='${sortScope}',
        |    'NO_INVERTED_INDEX'='stringField3, int2Field',
        |    'TABLE_BLOCKSIZE'='64'
        |  )
       """.stripMargin)
    df3.write
      .format("carbondata")  
      .option("tableName", "study_carbondata_test")
      .option("compress", "true")  // just valid when tempCSV is true
      .option("tempCSV", "false")
      .option("single_pass", "true")
      .mode(SaveMode.Append)
      .save()


The error message:
*java.lang.NegativeArraySizeException
        at
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
        at
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
        at
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
        at
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
        at
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
        at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
        at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*

Currently, the length of column was stored by short type, how to resolve
this issue if I need to story an long string, exceeding 32768?

Thanks.



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

ravipesala
Hi,

Currently, it is not possible to store column length more than short limit.
To support this feature we might need to introduce new datatype of
varchar(size) . You can raise jira for this new feature but we consider as
per the priority.

Regards,
Ravindra.

On 25 October 2017 at 11:34, xm_zzc <[hidden email]> wrote:

> Hi, dev:
>   I am using Spark 2.1 + CarbonData 1.2, and find that if
> enable.unsafe.sort=true, the length of bytes of column exceed 32768, it
> will
> load data unsuccessfully.
>
> My test code:
>     *val longStr = sb.toString()
>     println(longStr.length())
>     println(longStr.getBytes("UTF-8").length)
>
>     import spark.implicits._
>     val df1 = spark.sparkContext.parallelize(0 to 1000)
>       .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
>       .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
>     val df2 = spark.sparkContext.parallelize(1001 to 2000)
>       .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x *
> 2))
>       .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
>     val df3 = df1.union(df2)
>     val tableName = "study_carbondata_test"
>     spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
>     val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT
>     spark.sql(s"""
>         |  CREATE TABLE IF NOT EXISTS ${tableName} (
>         |    stringField1          string,
>         |    stringField2          string,
>         |    stringField3          string,
>         |    intField              int,
>         |    longField             bigint,
>         |    int2Field             int
>         |  )
>         |  STORED BY 'carbondata'
>         |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1,
> stringField2',
>         |    'SORT_COLUMNS'='stringField1, stringField2, intField,
> longField',
>         |    'SORT_SCOPE'='${sortScope}',
>         |    'NO_INVERTED_INDEX'='stringField3, int2Field',
>         |    'TABLE_BLOCKSIZE'='64'
>         |  )
>        """.stripMargin)
>     df3.write
>       .format("carbondata")
>       .option("tableName", "study_carbondata_test")
>       .option("compress", "true")  // just valid when tempCSV is true
>       .option("tempCSV", "false")
>       .option("single_pass", "true")
>       .mode(SaveMode.Append)
>       .save()*
>
> The error message:
> *java.lang.NegativeArraySizeException
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.
> getRow(UnsafeCarbonRowPage.java:182)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.holder.
> UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.
> UnsafeSingleThreadFinalSortFilesMerger.startSorting(
> UnsafeSingleThreadFinalSortFilesMerger.java:114)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.
> UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(
> UnsafeSingleThreadFinalSortFilesMerger.java:81)
>         at
> org.apache.carbondata.processing.newflow.sort.impl.
> UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterI
> mpl.java:105)
>         at
> org.apache.carbondata.processing.newflow.steps.
> SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
>         at
> org.apache.carbondata.processing.newflow.steps.
> DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
>         at
> org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(
> DataLoadExecutor.java:51)
>         at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(
> NewCarbonDataLoadRDD.scala:442)
>         at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(
> NewCarbonDataLoadRDD.scala:405)
>         at
> org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*
>
> Currently, the length of column was stored by short type, how to resolve
> this issue if I need to story an long string, exceeding 32768?
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.
> n5.nabble.com/
>



--
Thanks & Regards,
Ravi