Login  Register

The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

classic Classic list List threaded Threaded
2 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

xm_zzc
142 posts
This post was updated on Oct 25, 2017; 6:41am.
Hi, dev:
  I am using Spark 2.1 + CarbonData 1.2, and find that if
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will
load data unsuccessfully.

My test code:
    val longStr = sb.toString()  // the getBytes length of longStr exceeds 32768
    println(longStr.length())
    println(longStr.getBytes("UTF-8").length)
   
    import spark.implicits._
    val df1 = spark.sparkContext.parallelize(0 to 1000)
      .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
      .toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
     
    val df2 = spark.sparkContext.parallelize(1001 to 2000)
      .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
      .toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
     
    val df3 = df1.union(df2)
    val tableName = "study_carbondata_test"
    spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
    val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT
    spark.sql(s"""
        |  CREATE TABLE IF NOT EXISTS ${tableName} (
        |    stringField1          string,
        |    stringField2          string,
        |    stringField3          string,
        |    intField              int,
        |    longField             bigint,
        |    int2Field             int
        |  )
        |  STORED BY 'carbondata'
        |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2',
        |    'SORT_COLUMNS'='stringField1, stringField2, intField,
longField',
        |    'SORT_SCOPE'='${sortScope}',
        |    'NO_INVERTED_INDEX'='stringField3, int2Field',
        |    'TABLE_BLOCKSIZE'='64'
        |  )
       """.stripMargin)
    df3.write
      .format("carbondata")  
      .option("tableName", "study_carbondata_test")
      .option("compress", "true")  // just valid when tempCSV is true
      .option("tempCSV", "false")
      .option("single_pass", "true")
      .mode(SaveMode.Append)
      .save()


The error message:
*java.lang.NegativeArraySizeException
        at
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
        at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
        at
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
        at
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
        at
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
        at
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
        at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
        at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*

Currently, the length of column was stored by short type, how to resolve
this issue if I need to story an long string, exceeding 32768?

Thanks.



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: The length of bytes of column exceed 32768, it will load unsuccessfully if enable.unsafe.sort=true

ravipesala
300 posts
Hi,

Currently, it is not possible to store column length more than short limit.
To support this feature we might need to introduce new datatype of
varchar(size) . You can raise jira for this new feature but we consider as
per the priority.

Regards,
Ravindra.

On 25 October 2017 at 11:34, xm_zzc <[hidden email]> wrote:

> Hi, dev:
>   I am using Spark 2.1 + CarbonData 1.2, and find that if
> enable.unsafe.sort=true, the length of bytes of column exceed 32768, it
> will
> load data unsuccessfully.
>
> My test code:
>     *val longStr = sb.toString()
>     println(longStr.length())
>     println(longStr.getBytes("UTF-8").length)
>
>     import spark.implicits._
>     val df1 = spark.sparkContext.parallelize(0 to 1000)
>       .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
>       .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
>     val df2 = spark.sparkContext.parallelize(1001 to 2000)
>       .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x *
> 2))
>       .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
>     val df3 = df1.union(df2)
>     val tableName = "study_carbondata_test"
>     spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
>     val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT
>     spark.sql(s"""
>         |  CREATE TABLE IF NOT EXISTS ${tableName} (
>         |    stringField1          string,
>         |    stringField2          string,
>         |    stringField3          string,
>         |    intField              int,
>         |    longField             bigint,
>         |    int2Field             int
>         |  )
>         |  STORED BY 'carbondata'
>         |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1,
> stringField2',
>         |    'SORT_COLUMNS'='stringField1, stringField2, intField,
> longField',
>         |    'SORT_SCOPE'='${sortScope}',
>         |    'NO_INVERTED_INDEX'='stringField3, int2Field',
>         |    'TABLE_BLOCKSIZE'='64'
>         |  )
>        """.stripMargin)
>     df3.write
>       .format("carbondata")
>       .option("tableName", "study_carbondata_test")
>       .option("compress", "true")  // just valid when tempCSV is true
>       .option("tempCSV", "false")
>       .option("single_pass", "true")
>       .mode(SaveMode.Append)
>       .save()*
>
> The error message:
> *java.lang.NegativeArraySizeException
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.
> getRow(UnsafeCarbonRowPage.java:182)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.holder.
> UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.
> UnsafeSingleThreadFinalSortFilesMerger.startSorting(
> UnsafeSingleThreadFinalSortFilesMerger.java:114)
>         at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.
> UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(
> UnsafeSingleThreadFinalSortFilesMerger.java:81)
>         at
> org.apache.carbondata.processing.newflow.sort.impl.
> UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterI
> mpl.java:105)
>         at
> org.apache.carbondata.processing.newflow.steps.
> SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
>         at
> org.apache.carbondata.processing.newflow.steps.
> DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
>         at
> org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(
> DataLoadExecutor.java:51)
>         at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(
> NewCarbonDataLoadRDD.scala:442)
>         at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(
> NewCarbonDataLoadRDD.scala:405)
>         at
> org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*
>
> Currently, the length of column was stored by short type, how to resolve
> this issue if I need to story an long string, exceeding 32768?
>
> Thanks.
>
>
>
> --
> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.
> n5.nabble.com/
>



--
Thanks & Regards,
Ravi