Hi:
I ran a structured streaming app on local[4] mode (Spark 2.3.2 + CarbonData master branch) to insert data, and then started a thread to execute select sql, the 'NullPointerException' occured occasionally. *I found that the smaller the value of CarbonCommonConstants.HANDOFF_SIZE is, the more easily the error occur*. Please see my test code: CarbonStream1_5.scala <http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/file/t133/CarbonStream1_5.scala> The NullPointerException is : Exception in thread "Thread-42" java.lang.NullPointerException at org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1001) at org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:656) at org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap.prune(BlockDataMap.java:743) at org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory.getAllBlocklets(BlockletDataMapFactory.java:391) at org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:132) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:491) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:412) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:528) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:219) at org.apache.carbondata.spark.rdd.CarbonScanRDD.internalGetPartitions(CarbonScanRDD.scala:127) at org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:67) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321) at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.doExecute(limit.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.head(Dataset.scala:2489) at org.apache.spark.sql.Dataset.take(Dataset.scala:2703) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:725) at org.apache.spark.sql.Dataset.show(Dataset.scala:702) at cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1$$anonfun$run$1.apply$mcVI$sp(CarbonStream1_5.scala:202) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at cn.xm.zzc.carbonmaster.CarbonStream1_5$$anon$1.run(CarbonStream1_5.scala:200) *BTW, the index fiels were not merged after handoff, handoff operation does not support merging index files now, right?* -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
This post was updated on .
Hi:
I added some logs to trace this problem, found that when call BlockDataMap.getFileFooterEntrySchema, the key 'segmentPropertiesIndex' which was stored in BlockDataMap instance was removed by other thread from SegmentPropertiesAndSchemaHolder.indexToSegmentPropertiesWrapperMapping : 2018-10-31 14:49:24,967 datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115) Thread-39 -========put 37 into indexToSegmentPropertiesWrapperMapping 0 2018-10-31 14:49:25,472 datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243) Executor task launch worker for task 926 -========remove 37 out of indexToSegmentPropertiesWrapperMapping 31 2018-10-31 14:49:25,486 indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002) Thread-39 -========get 37 null the key segmentPropertiesIndex=37 was removed at 2018-10-31 14:49:25,472. 2018-10-31 14:56:45,057 datastore.block.SegmentPropertiesAndSchemaHolder.addSegmentProperties(SegmentPropertiesAndSchemaHolder.java:115) Thread-39 -========put 98 into indexToSegmentPropertiesWrapperMapping 0 2018-10-31 14:56:45,477 datastore.block.SegmentPropertiesAndSchemaHolder.invalidate(SegmentPropertiesAndSchemaHolder.java:243) Executor task launch worker for task 2653 -========remove 98 out of indexToSegmentPropertiesWrapperMapping 67 2018-10-31 14:56:46,290 indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002) Thread-39 -========get 98 null 2018-10-31 14:56:51,392 indexstore.blockletindex.BlockDataMap.getFileFooterEntrySchema(BlockDataMap.java:1002) Thread-39 -========get 98 null -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi:
The root cause is that when execute select sql, BlockDataMap will call 'SegmentPropertiesAndSchemaHolder.addSegmentProperties ' to add segment info one by one, meanwhile if there are some segments updated, for example, stream segment is handoff , handoff thread will call 'SegmentPropertiesAndSchemaHolder.invalidate' to delete segment info one by one, if segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty() is true, it will remove segmentPropertiesIndex, but select thread is still using segmentPropertiesIndex to add/get segment info, and then NPE occur. -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Where do we call SegmentPropertiesAndSchemaHolder.invalidate in handoff
thread? ----- Best Regards David Cai -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Best Regards
David Cai |
Hi David:
please see the call stack: <http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/file/t133/QQ%E6%88%AA%E5%9B%BE20181106140118.png> -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi xm_zzc
As I can see from logs and the code flow, the hand-off code clears the cache from executor code while the exception is thrown from driver code during query. You are getting the exception because you are using local mode whereIn executor and driver are in the same JVM. We will check on this how can we avoid the exception in local mode scenario. Meanwhile you can continue your testing in cluster mode. Regards Manish Gupta -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Free forum by Nabble | Edit this page |