[ https://issues.apache.org/jira/browse/CARBONDATA-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhichao Zhang updated CARBONDATA-1625: --------------------------------------- Description: I am using Spark 2.1 + CarbonData 1.2, and find that if enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will load data unsuccessfully. My test code: {code:java} val longStr = sb.toString() // the getBytes length of longStr exceeds 32768 println(longStr.length()) println(longStr.getBytes("UTF-8").length) import spark.implicits._ val df1 = spark.sparkContext.parallelize(0 to 1000) .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df2 = spark.sparkContext.parallelize(1001 to 2000) .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df3 = df1.union(df2) val tableName = "study_carbondata_test" spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT spark.sql(s""" | CREATE TABLE IF NOT EXISTS ${tableName} ( | stringField1 string, | stringField2 string, | stringField3 string, | intField int, | longField bigint, | int2Field int | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', | 'SORT_COLUMNS'='stringField1, stringField2, intField, longField', | 'SORT_SCOPE'='${sortScope}', | 'NO_INVERTED_INDEX'='stringField3, int2Field', | 'TABLE_BLOCKSIZE'='64' | ) """.stripMargin) df3.write .format("carbondata") .option("tableName", "study_carbondata_test") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .mode(SaveMode.Append) .save() {code} The error message: {code:java} *java.lang.NegativeArraySizeException at org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182) at org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63) at org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114) at org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81) at org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105) at org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62) at org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87) at org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51) at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442) at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405) at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* {code} Currently, the length of column was stored by short type. Introduce new datatype of varchar(size) to store column length more than short limit. was: I am using Spark 2.1 + CarbonData 1.2, and find that if enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will load data unsuccessfully. My test code: val longStr = sb.toString() // the getBytes length of longStr exceeds 32768 println(longStr.length()) println(longStr.getBytes("UTF-8").length) import spark.implicits._ val df1 = spark.sparkContext.parallelize(0 to 1000) .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df2 = spark.sparkContext.parallelize(1001 to 2000) .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df3 = df1.union(df2) val tableName = "study_carbondata_test" spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT spark.sql(s""" | CREATE TABLE IF NOT EXISTS ${tableName} ( | stringField1 string, | stringField2 string, | stringField3 string, | intField int, | longField bigint, | int2Field int | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', | 'SORT_COLUMNS'='stringField1, stringField2, intField, longField', | 'SORT_SCOPE'='${sortScope}', | 'NO_INVERTED_INDEX'='stringField3, int2Field', | 'TABLE_BLOCKSIZE'='64' | ) """.stripMargin) df3.write .format("carbondata") .option("tableName", "study_carbondata_test") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .mode(SaveMode.Append) .save() The error message: *java.lang.NegativeArraySizeException at org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182) at org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63) at org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114) at org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81) at org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105) at org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62) at org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87) at org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51) at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442) at org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405) at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* Currently, the length of column was stored by short type. Introduce new datatype of varchar(size) to store column length more than short limit. > Introduce new datatype of varchar(size) to store column length more than short limit. > -------------------------------------------------------------------------------------- > > Key: CARBONDATA-1625 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1625 > Project: CarbonData > Issue Type: New Feature > Components: file-format > Reporter: Zhichao Zhang > Priority: Minor > > I am using Spark 2.1 + CarbonData 1.2, and find that if > enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will > load data unsuccessfully. > My test code: > > {code:java} > val longStr = sb.toString() // the getBytes length of longStr exceeds 32768 > println(longStr.length()) > println(longStr.getBytes("UTF-8").length) > > import spark.implicits._ > val df1 = spark.sparkContext.parallelize(0 to 1000) > .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) > .toDF("stringField1", "stringField2", "stringField3", "intField", > "longField", "int2Field") > > val df2 = spark.sparkContext.parallelize(1001 to 2000) > .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) > .toDF("stringField1", "stringField2", "stringField3", "intField", > "longField", "int2Field") > > val df3 = df1.union(df2) > val tableName = "study_carbondata_test" > spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() > val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT > spark.sql(s""" > | CREATE TABLE IF NOT EXISTS ${tableName} ( > | stringField1 string, > | stringField2 string, > | stringField3 string, > | intField int, > | longField bigint, > | int2Field int > | ) > | STORED BY 'carbondata' > | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', > | 'SORT_COLUMNS'='stringField1, stringField2, intField, > longField', > | 'SORT_SCOPE'='${sortScope}', > | 'NO_INVERTED_INDEX'='stringField3, int2Field', > | 'TABLE_BLOCKSIZE'='64' > | ) > """.stripMargin) > df3.write > .format("carbondata") > .option("tableName", "study_carbondata_test") > .option("compress", "true") // just valid when tempCSV is true > .option("tempCSV", "false") > .option("single_pass", "true") > .mode(SaveMode.Append) > .save() > {code} > The error message: > {code:java} > *java.lang.NegativeArraySizeException > at > org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182) > at > org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63) > at > org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114) > at > org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81) > at > org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105) > at > org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62) > at > org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87) > at > org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51) > at > org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442) > at > org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405) > at > org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* > {code} > Currently, the length of column was stored by short type. > Introduce new datatype of varchar(size) to store column length more than short limit. -- This message was sent by Atlassian JIRA (v6.4.14#64029) |
Free forum by Nabble | Edit this page |