Login  Register

Concurrent data loading issues

Posted by Chin Wei on Nov 12, 2019; 3:31am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/Concurrent-data-loading-issues-tp86108.html

Hi Community,

I am trying to use Carbondata to store data that ingest continuously and
retain for a long period of time, e.g. 1 year. It will need to load
multiple csv concurrently to handle the throughput of the data. Along the
way, I come across a couple of issues.
I am testing using the latest Carbondata release now, 1.6.1.

The issues:
1. The table become inconsistent when concurrent load is running [
https://issues.apache.org/jira/browse/CARBONDATA-3573]. I temporary fix
that and proceed and bump to issue #2.
2. The table compaction failed when concurrent load is running together
with table compaction. The compaction work fine for a while and then it
failed, then compaction never work for the table anymore after that.
3. Even though I try to run the data load in multiple thread, i.e. 4
threads, but I observed maximum only 2 concurrent load is running at any
time (from the SPARK UI Executors page). I am running Carbondata with
master = local[16].
4. Loading data into Carbondata will become slower when the tablestatus
growing bigger. I believe it will affect the query as well. [posted in
another thread -
http://apache-carbondata-mailing-list-archive.1130556.n5.nabble.com/Data-Load-performance-degrade-when-number-of-segment-increase-tp86031.html
]


Anyone has experience on concurrent data loading with Carbondata and did
anyone face the same issues?
Any pointer on how to fix issue #2?
Any plan or idea on how to improve issue #3 & #4?

Regards,
Chin Wei

The stacktrace for issue #2:
ERROR CarbonTableCompactor:88 - Exception in compaction thread null
java.lang.NullPointerException
        at
org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:122)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:590)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:503)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:461)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:197)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:382)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:363)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalGetPartitions(CarbonMergerRDD.scala:363)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:68)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.triggerCompaction(CarbonTableCompactor.scala:204)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.scanSegmentsAndSubmitJob(CarbonTableCompactor.scala:136)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.executeCompaction(CarbonTableCompactor.scala:85)
        at
org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$$anon$2.run(CarbonDataRDDFactory.scala:180)
        at
org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.startCompactionThreads(CarbonDataRDDFactory.scala:287)
        at
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.alterTableForCompaction(CarbonAlterTableCompactionCommand.scala:338)
        at
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.processData(CarbonAlterTableCompactionCommand.scala:202)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:148)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:145)
        at
org.apache.spark.sql.execution.command.Auditable$class.runWithAudit(package.scala:104)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand.runWithAudit(package.scala:141)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand.run(package.scala:145)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at
org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at
org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:91)
        at
org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:90)
        at
org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:136)
        at org.apache.spark.sql.CarbonSession.sql(CarbonSession.scala:88)