Throw NullPointerException occasionally when query from stream table

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Throw NullPointerException occasionally when query from stream table

xm_zzc
Hi:
  I ran a structured streaming app on local[4] mode (Spark 2.3.2 +
CarbonData master branch) to insert data, and then started a thread to
execute select sql, the 'NullPointerException' occured occasionally.
  *I found that the smaller the value of CarbonCommonConstants.HANDOFF_SIZE
is, the more easily the error occur*.
  Please see my test code:  CarbonStream1_5.scala
<http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/file/t133/CarbonStream1_5.scala>  
 
  The  NullPointerException is :
  Exception in thread "Thread-42" java.lang.NullPointerException
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1001)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:656)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:743)
        at
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory.getAllBlocklets(BlockletDataMapFactory.java:391)
        at
org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:132)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:491)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:412)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:528)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:219)
        at
org.apache.carbondata.spark.rdd.CarbonScanRDD.internalGetPartitions(CarbonScanRDD.scala:127)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:67)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
        at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321)
        at
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.doExecute(limit.scala:154)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
        at
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
        at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:702)
        at
cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1$$anonfun$run$1.apply$mcVI$sp(CarbonStream1_5.scala:202)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
        at
cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1.run(CarbonStream1_5.scala:200)
       
        *BTW, the index fiels were not merged after handoff, handoff operation does
not support merging index files now, right?*
 



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc
This post was updated on .
Hi:
  I added some logs to trace this problem, found that when call
BlockDataMap.getFileFooterEntrySchema, the key 'segmentPropertiesIndex'
which was stored in BlockDataMap instance was removed by other thread from
SegmentPropertiesAndSchemaHolder.indexToSegmentPropertiesWrapperMapping :

2018-10-31 14:49:24,967
datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115)
Thread-39 -========put 37 into indexToSegmentPropertiesWrapperMapping 0

2018-10-31 14:49:25,472
datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243)
Executor task launch worker for task 926 -========remove 37 out of
indexToSegmentPropertiesWrapperMapping 31

2018-10-31 14:49:25,486
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 37 null

the key segmentPropertiesIndex=37 was removed at 2018-10-31 14:49:25,472.

2018-10-31 14:56:45,057
datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115)
Thread-39 -========put 98 into indexToSegmentPropertiesWrapperMapping 0

2018-10-31 14:56:45,477
datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243)
Executor task launch worker for task 2653 -========remove 98 out of
indexToSegmentPropertiesWrapperMapping 67

2018-10-31 14:56:46,290
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 98 null

2018-10-31 14:56:51,392
indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002)
Thread-39 -========get 98 null




--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc
Hi:
  The root cause is that when execute select sql, BlockDataMap will call
'SegmentPropertiesAndSchemaHolder.addSegmentProperties ' to add segment info
one by one, meanwhile if there are some segments updated, for example,
stream segment is handoff , handoff thread will call
'SegmentPropertiesAndSchemaHolder.invalidate' to delete segment info one by
one, if segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty() is
true, it will remove segmentPropertiesIndex, but select thread is still
using segmentPropertiesIndex to add/get segment info, and then NPE occur.



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

David CaiQiang
Where do we call SegmentPropertiesAndSchemaHolder.invalidate in handoff
thread?



-----
Best Regards
David Cai
--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

xm_zzc
Reply | Threaded
Open this post in threaded view
|

Re: Throw NullPointerException occasionally when query from stream table

manishgupta88
Hi xm_zzc

As I can see from logs and the code flow, the hand-off code clears the cache
from executor code while the exception is thrown from driver code during
query.
You are getting the exception because you are using local mode whereIn
executor and driver are in the same JVM. We will check on this how can we
avoid the exception in local mode scenario. Meanwhile you can continue your
testing in cluster mode.

Regards
Manish Gupta



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/