[ https://issues.apache.org/jira/browse/CARBONDATA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356870#comment-16356870 ] Zhichao Zhang commented on CARBONDATA-2147: -------------------------------------------- [~Vandana7] I can resolve this issue, the default parser 'CSVStreamParserImp' will cause this problem. > Exception displays while loading data with streaming > ---------------------------------------------------- > > Key: CARBONDATA-2147 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2147 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.3.0 > Environment: spark 2.1, spark 2.2.1 > Reporter: Vandana Yadav > Priority: Minor > > Exception displays while loading data with streaming > Steps to reproduce: > 1) start spark-shell: > ./spark-shell --jars /opt/spark/spark-2.2.1/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > 2) Execute following script: > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54310/newCarbonStore","/tmp") > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") > carbon.sql("drop table if exists uniqdata_stream") > carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')"); > import carbon.sqlContext.implicits._ > import org.apache.spark.sql.types._ > val uniqdataSch = StructType( > Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) > val streamDf = carbon.readStream > .schema(uniqdataSch) > .option("sep", ",") > .csv("file:///home/knoldus/Documents/uniqdata") > val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation","/stream/uniq") > .option("dbName", "default") > .option("tableName", "uniqdata_stream") > .start() > > 3) Error logs: > warning: there was one deprecation warning; re-run with -deprecation for details > uniqdataSch: org.apache.spark.sql.types.StructType = StructType(StructField(CUST_ID,IntegerType,true), StructField(CUST_NAME,StringType,true), StructField(DOB,TimestampType,true), StructField(DOJ,TimestampType,true), StructField(BIGINT_COLUMN1,LongType,true), StructField(BIGINT_COLUMN2,LongType,true), StructField(DECIMAL_COLUMN1,DecimalType(30,10),true), StructField(DECIMAL_COLUMN2,DecimalType(36,10),true), StructField(Double_COLUMN1,DoubleType,true), StructField(Double_COLUMN2,DoubleType,true), StructField(INTEGER_COLUMN1,IntegerType,true)) > streamDf: org.apache.spark.sql.DataFrame = [CUST_ID: int, CUST_NAME: string ... 9 more fields] > qry: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d0e155c > scala> 18/02/08 16:38:53 ERROR StreamSegment: Executor task launch worker for task 5 Failed to append batch data to stream segment: hdfs://localhost:54310/newCarbonStore/default/uniqdata_stream1/Fact/Part0/Segment_0 > java.lang.NullPointerException > at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) > at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 18/02/08 16:38:53 ERROR Utils: Aborting task > java.lang.NullPointerException > at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) > at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 18/02/08 16:38:53 ERROR CarbonAppendableStreamSink$: Executor task launch worker for task 5 Job job_20180208163853_0005 aborted. > 18/02/08 16:38:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) > org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32) > at org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |