Concurrent data loading issues

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Concurrent data loading issues

Chin Wei
Hi Community,

I am trying to use Carbondata to store data that ingest continuously and
retain for a long period of time, e.g. 1 year. It will need to load
multiple csv concurrently to handle the throughput of the data. Along the
way, I come across a couple of issues.
I am testing using the latest Carbondata release now, 1.6.1.

The issues:
1. The table become inconsistent when concurrent load is running [
https://issues.apache.org/jira/browse/CARBONDATA-3573]. I temporary fix
that and proceed and bump to issue #2.
2. The table compaction failed when concurrent load is running together
with table compaction. The compaction work fine for a while and then it
failed, then compaction never work for the table anymore after that.
3. Even though I try to run the data load in multiple thread, i.e. 4
threads, but I observed maximum only 2 concurrent load is running at any
time (from the SPARK UI Executors page). I am running Carbondata with
master = local[16].
4. Loading data into Carbondata will become slower when the tablestatus
growing bigger. I believe it will affect the query as well. [posted in
another thread -
http://apache-carbondata-mailing-list-archive.1130556.n5.nabble.com/Data-Load-performance-degrade-when-number-of-segment-increase-tp86031.html
]


Anyone has experience on concurrent data loading with Carbondata and did
anyone face the same issues?
Any pointer on how to fix issue #2?
Any plan or idea on how to improve issue #3 & #4?

Regards,
Chin Wei

The stacktrace for issue #2:
ERROR CarbonTableCompactor:88 - Exception in compaction thread null
java.lang.NullPointerException
        at
org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:122)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:590)
        at
org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:503)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:461)
        at
org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:197)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:382)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:363)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalGetPartitions(CarbonMergerRDD.scala:363)
        at
org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:68)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.triggerCompaction(CarbonTableCompactor.scala:204)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.scanSegmentsAndSubmitJob(CarbonTableCompactor.scala:136)
        at
org.apache.carbondata.spark.rdd.CarbonTableCompactor.executeCompaction(CarbonTableCompactor.scala:85)
        at
org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$$anon$2.run(CarbonDataRDDFactory.scala:180)
        at
org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.startCompactionThreads(CarbonDataRDDFactory.scala:287)
        at
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.alterTableForCompaction(CarbonAlterTableCompactionCommand.scala:338)
        at
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.processData(CarbonAlterTableCompactionCommand.scala:202)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:148)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:145)
        at
org.apache.spark.sql.execution.command.Auditable$class.runWithAudit(package.scala:104)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand.runWithAudit(package.scala:141)
        at
org.apache.spark.sql.execution.command.AtomicRunnableCommand.run(package.scala:145)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at
org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at
org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:91)
        at
org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:90)
        at
org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:136)
        at org.apache.spark.sql.CarbonSession.sql(CarbonSession.scala:88)
Reply | Threaded
Open this post in threaded view
|

Re: Concurrent data loading issues

xuchuanyin
Hi, concurrent load will not cause the problem and I've tried that months
ago.

Seen from the log, it seems that the problem lies in Compation that
automatically triggered after loading.

To solve the problem, I think you can:
1. firstly turn off auto-compaction to increase loading performance, and you
can fire concurrent loading here.
2. trigger compaction at some period for the performance of future query. Be
sure not to fire multiple compaction concurrently which I'm not sure whether
it has problems or not. But I think you can fire compaction along with the
loading (which is not tested by me yet.)



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Concurrent data loading issues

Chin Wei
Thanks for your response.

I am running manual compaction, basically the flow are:
1. Multiple (4) threads that running the load data command (each load is
100k rows). I am loading data continuously up to 10,000 csv files.
2. For every 10 load data command, calling the compaction command in another
thread. This is call along with data loading.


I get another error in load data command if I call the 'CLEAN FILES FOR
TABLE' command in another thread along the load data command.
 ERROR CarbonLoaderUtil:331 - Entry not found to update
LoadMetadataDetails{loadStatus=Success, loadName='55',
loadStartTime='1573627227495', segmentFile='55_1573627227495.segment'} From
list :: [LoadMetadataDetails{loadStatus=Compacted, loadName='0',
loadStartTime='1573627193399', segmentFile='0_1573627193399.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='0.1',
loadStartTime='1573627205355', segmentFile='0.1_1573627205355.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='10.1',
loadStartTime='1573627205355', segmentFile='10.1_1573627205355.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='20.1',
loadStartTime='1573627216942', segmentFile='20.1_1573627216942.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='50',
loadStartTime='1573627225090', segmentFile='50_1573627225090.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='51',
loadStartTime='1573627225750', segmentFile='51_1573627225750.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='30.1',
loadStartTime='1573627224848', segmentFile='30.1_1573627224848.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='52',
loadStartTime='1573627226311', segmentFile='52_1573627226311.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='53',
loadStartTime='1573627226913', segmentFile='53_1573627226913.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='40.1',
loadStartTime='1573627224848', segmentFile='40.1_1573627224848.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='54',
loadStartTime='1573627227492', segmentFile='54_1573627227492.segment'},
LoadMetadataDetails{loadStatus=Success, loadName='56',
loadStartTime='1573627228051', segmentFile='56_1573627228051.segment'},
LoadMetadataDetails{loadStatus=Insert In Progress, loadName='57',
loadStartTime='1573627228635', segmentFile='null'}]

It seems that the segment 55 is added to tablestatus after CLEAN FILES
command read all the segment from tablestatus, and when CLEAN FILES command
complete, it does not write segment 55 into the tablestatus. Then, when
segment 55 load complete, it is unable to update the tablestatus with that
error.



--
Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/