[ https://issues.apache.org/jira/browse/CARBONDATA-1991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] anubhav tarar updated CARBONDATA-1991: -------------------------------------- Description: Start spark shell using: ./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar Execute the following code: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("drop table if exists uniqdata_stream") carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); import carbon.sqlContext.implicits._ val lines = carbon.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() Now start a server socket using: nc -lk 9999 and write the 2000_uniqdata.csv data to that terminal Now execute the following on spark shell val qry = lines.writeStream .format("carbondata") .option("checkpointLocation","/newtmp1") .option("dbName", "default") .option("tableName", "uniqdata_stream") .start() qry.awaitTermination() Stop your socket and spark-shell when you are done with data writing, Again start the spark-shell and perform the following query: carbon.sql("select * from uniqdata_stream").show It throws the following exception: 18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305) at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) was: Start spark shell using: ./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar Execute the following code: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("drop table if exists uniqdata_part") carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); import carbon.sqlContext.implicits._ val lines = carbon.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() Now start a server socket using: nc -lk 9999 and write the 2000_uniqdata.csv data to that terminal Now execute the following on spark shell val qry = lines.writeStream .format("carbondata") .option("checkpointLocation","/newtmp1") .option("dbName", "default") .option("tableName", "uniqdata_stream") .start() qry.awaitTermination() Stop your socket and spark-shell when you are done with data writing, Again start the spark-shell and perform the following query: carbon.sql("select * from uniqdata_stream").show It throws the following exception: 18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305) at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) > Select query from a streaming table throws ClassCastException > ------------------------------------------------------------- > > Key: CARBONDATA-1991 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1991 > Project: CarbonData > Issue Type: Bug > Components: data-query > Affects Versions: 1.3.0 > Environment: spark2.1 > Reporter: Geetika Gupta > Assignee: anubhav tarar > Fix For: 1.3.0 > > Attachments: 2000_UniqData.csv > > > Start spark shell using: > ./bin/spark-shell --jars carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > Execute the following code: > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") > carbon.sql("drop table if exists uniqdata_stream") > carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); > import carbon.sqlContext.implicits._ > val lines = carbon.readStream > .format("socket") > .option("host", "localhost") > .option("port", 9999) > .load() > Now start a server socket using: > nc -lk 9999 > and write the 2000_uniqdata.csv data to that terminal > Now execute the following on spark shell > val qry = lines.writeStream > .format("carbondata") > .option("checkpointLocation","/newtmp1") > .option("dbName", "default") > .option("tableName", "uniqdata_stream") > .start() > qry.awaitTermination() > Stop your socket and spark-shell when you are done with data writing, > Again start the spark-shell and perform the following query: > carbon.sql("select * from uniqdata_stream").show > It throws the following exception: > 18/01/05 18:25:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.putRowToColumnBatch(CarbonStreamRecordReader.java:702) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:424) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:324) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:305) > at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:370) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) > at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v6.4.14#64029) |
Free forum by Nabble | Edit this page |