Login  Register

[Issue] Bloomfilter datamap

Posted by aaron on Sep 24, 2018; 12:28pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/Issue-Bloomfilter-datamap-tp63254.html

Hi Community,

I found that the Bloomfilter datamap with pre agg datamap will break normal group by query, even make some other queries broken on thrift server side. But if I drop the bloom filter datamap, then query works.
*************************************************************************************
Demo SQL:

CREATE TABLE IF NOT EXISTS store(
 market_code STRING,
 device_code STRING,
 country_code STRING,
 category_id INTEGER,
 product_id LONG,
 date TIMESTAMP,
 est_free_app_download LONG,
 est_paid_app_download LONG,
 est_revenue LONG
 )
 STORED BY 'carbondata'
 TBLPROPERTIES(
 'SORT_COLUMNS'='market_code, device_code, country_code, category_id, date,
product_id',
 'NO_INVERTED_INDEX'='est_free_app_download, est_paid_app_download,
est_revenue',
 'DICTIONARY_INCLUDE' = 'market_code, device_code, country_code,
category_id, product_id',
 'SORT_SCOPE'='GLOBAL_SORT',
 'CACHE_LEVEL'='BLOCKLET',
 'TABLE_BLOCKSIZE'='256',
 'GLOBAL_SORT_PARTITIONS'='2'
 )


CREATE DATAMAP IF NOT EXISTS agg_by_day ON TABLE store
 USING 'timeSeries'
 DMPROPERTIES (
 'EVENT_TIME'='date',
 'DAY_GRANULARITY'='1')
 AS SELECT date, market_code, device_code, country_code, category_id,
 COUNT(date), COUNT(est_free_app_download), COUNT(est_free_app_download),
COUNT(est_revenue),
 SUM(est_free_app_download), MIN(est_free_app_download),
MAX(est_free_app_download),
 SUM(est_paid_app_download), MIN(est_paid_app_download),
MAX(est_paid_app_download),
 SUM(est_revenue), MIN(est_revenue), MAX(est_revenue)
 FROM store
 GROUP BY date, market_code, device_code, country_code, category_id

CREATE DATAMAP IF NOT EXISTS bloomfilter_all_dimensions ON TABLE store
 USING 'bloomfilter'
 DMPROPERTIES (
 'INDEX_COLUMNS'='market_code, device_code, country_code, category_id, date,
product_id',
 'BLOOM_SIZE'='640000',
 'BLOOM_FPP'='0.000001',
 'BLOOM_COMPRESS'='true'
 )


*************************************************************************************
This is the stack trace,


carbon.time(carbon.sql(
     |       s"""
     |          |SELECT date, market_code, device_code, country_code,
category_id, sum(est_free_app_download)
     |          |FROM store
     |          |WHERE date BETWEEN '2016-09-01' AND '2016-09-03' AND
device_code='ios-phone' AND country_code='EE' AND category_id=100021 AND
product_id IN (590416158, 590437560)
     |          |GROUP BY date, market_code, device_code, country_code,
category_id"""
     |         .stripMargin).show(truncate=false)
     |     )
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
Exchange hashpartitioning(date#21, market_code#16, device_code#17,
country_code#18, category_id#19, 2)
+- *(1) HashAggregate(keys=[date#21, market_code#16, device_code#17,
country_code#18, category_id#19],
functions=[partial_sum(est_free_app_download#22L)], output=[date#21,
market_code#16, device_code#17, country_code#18, category_id#19, sum#74L])
   +- *(1) CarbonDictionaryDecoder [default_store],
IncludeProfile(ArrayBuffer(category_id#19)), CarbonAliasDecoderRelation(),
org.apache.spark.sql.CarbonSession@213d5189
      +- *(1) Project [market_code#16, device_code#17, country_code#18,
category_id#19, date#21, est_free_app_download#22L]
         +- *(1) FileScan carbondata
default.store[category_id#19,market_code#16,country_code#18,device_code#17,est_free_app_download#22L,date#21]
PushedFilters: [IsNotNull(date), IsNotNull(device_code),
IsNotNull(country_code), IsNotNull(category_id), Greate...

  at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
  at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
  at
org.apache.spark.sql.CarbonDictionaryDecoder.inputRDDs(CarbonDictionaryDecoder.scala:244)
  at
org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
  at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
  at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
  at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:702)
  at $anonfun$1.apply$mcV$sp(<console>:39)
  at $anonfun$1.apply(<console>:39)
  at $anonfun$1.apply(<console>:39)
  at org.apache.spark.sql.SparkSession.time(SparkSession.scala:676)
  ... 57 elided
Caused by: java.lang.NullPointerException
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.createQueryModel(BloomCoarseGrainDataMap.java:269)
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.createQueryModel(BloomCoarseGrainDataMap.java:270)
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.createQueryModel(BloomCoarseGrainDataMap.java:270)
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.createQueryModel(BloomCoarseGrainDataMap.java:270)
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.createQueryModel(BloomCoarseGrainDataMap.java:270)
  at
org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMap.prune(BloomCoarseGrainDataMap.java:181)
  at
org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:136)
  at
org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapperImpl.prune(DataMapExprWrapperImpl.java:53)
  at
org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper.prune(AndDataMapExprWrapper.java:51)
  at
org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper.prune(AndDataMapExprWrapper.java:51)
  at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:515)
  at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:412)
  at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:528)
  at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:219)
  at
org.apache.carbondata.spark.rdd.CarbonScanRDD.internalGetPartitions(CarbonScanRDD.scala:124)
  at
org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:61)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
  at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:318)
  at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
  at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
  at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 94 more

scala> carbon.sql("drop datamap bloomfilter_all_dimensions on table store")
18/09/24 05:06:17 AUDIT CarbonDropDataMapCommand:
[ec2-dca-aa-p-sdn-16.appannie.org][hadoop][Thread-1]Deleting datamap
[bloomfilter_all_dimensions] under table [store]
res1: org.apache.spark.sql.DataFrame = []                                      

scala> carbon.time(carbon.sql(
     |       s"""
     |          |SELECT product_id, sum(est_free_app_download)
     |          |FROM store
     |          |WHERE date BETWEEN '2016-09-01' AND '2016-09-03' AND
device_code='ios-phone' AND country_code='EE' AND category_id=100021 AND
product_id IN (590416158, 590437560)
     |          |GROUP BY product_id"""
     |         .stripMargin).show(truncate=false)
     |     )
+----------+--------------------------+                                        
|product_id|sum(est_free_app_download)|
+----------+--------------------------+
|590416158 |2                         |
|590437560 |null                      |
+----------+--------------------------+



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/