Vandana Yadav created CARBONDATA-2147:
----------------------------------------- Summary: Exception displays while loading data with streaming Key: CARBONDATA-2147 URL: https://issues.apache.org/jira/browse/CARBONDATA-2147 Project: CarbonData Issue Type: Bug Components: data-load Affects Versions: 1.3.0 Environment: spark 2.1, spark 2.2.1 Reporter: Vandana Yadav Exception displays while loading data with streaming Steps to reproduce: 1) start spark-shell: ./spark-shell --jars /opt/spark/spark-2.2.1/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 2) Execute following script: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54310/newCarbonStore","/tmp") import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") carbon.sql("drop table if exists uniqdata_stream") carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); import carbon.sqlContext.implicits._ import org.apache.spark.sql.types._ val uniqdataSch = StructType( Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) val streamDf = carbon.readStream .schema(uniqdataSch) .option("sep", ",") .csv("file:///home/knoldus/Documents/uniqdata") val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) .option("checkpointLocation","/stream/uniq") .option("dbName", "default") .option("tableName", "uniqdata_stream") .start() 3) Error logs: warning: there was one deprecation warning; re-run with -deprecation for details uniqdataSch: org.apache.spark.sql.types.StructType = StructType(StructField(CUST_ID,IntegerType,true), StructField(CUST_NAME,StringType,true), StructField(DOB,TimestampType,true), StructField(DOJ,TimestampType,true), StructField(BIGINT_COLUMN1,LongType,true), StructField(BIGINT_COLUMN2,LongType,true), StructField(DECIMAL_COLUMN1,DecimalType(30,10),true), StructField(DECIMAL_COLUMN2,DecimalType(36,10),true), StructField(Double_COLUMN1,DoubleType,true), StructField(Double_COLUMN2,DoubleType,true), StructField(INTEGER_COLUMN1,IntegerType,true)) streamDf: org.apache.spark.sql.DataFrame = [CUST_ID: int, CUST_NAME: string ... 9 more fields] qry: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d0e155c scala> 18/02/08 16:38:53 ERROR StreamSegment: Executor task launch worker for task 5 Failed to append batch data to stream segment: hdfs://localhost:54310/newCarbonStore/default/uniqdata_stream1/Fact/Part0/Segment_0 java.lang.NullPointerException at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/02/08 16:38:53 ERROR Utils: Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/02/08 16:38:53 ERROR CarbonAppendableStreamSink$: Executor task launch worker for task 5 Job job_20180208163853_0005 aborted. 18/02/08 16:38:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |