Hi Community,
I am trying to use Carbondata to store data that ingest continuously and retain for a long period of time, e.g. 1 year. It will need to load multiple csv concurrently to handle the throughput of the data. Along the way, I come across a couple of issues. I am testing using the latest Carbondata release now, 1.6.1. The issues: 1. The table become inconsistent when concurrent load is running [ https://issues.apache.org/jira/browse/CARBONDATA-3573]. I temporary fix that and proceed and bump to issue #2. 2. The table compaction failed when concurrent load is running together with table compaction. The compaction work fine for a while and then it failed, then compaction never work for the table anymore after that. 3. Even though I try to run the data load in multiple thread, i.e. 4 threads, but I observed maximum only 2 concurrent load is running at any time (from the SPARK UI Executors page). I am running Carbondata with master = local[16]. 4. Loading data into Carbondata will become slower when the tablestatus growing bigger. I believe it will affect the query as well. [posted in another thread - http://apache-carbondata-mailing-list-archive.1130556.n5.nabble.com/Data-Load-performance-degrade-when-number-of-segment-increase-tp86031.html ] Anyone has experience on concurrent data loading with Carbondata and did anyone face the same issues? Any pointer on how to fix issue #2? Any plan or idea on how to improve issue #3 & #4? Regards, Chin Wei The stacktrace for issue #2: ERROR CarbonTableCompactor:88 - Exception in compaction thread null java.lang.NullPointerException at org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:122) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:590) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:503) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:461) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:197) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:382) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:363) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalGetPartitions(CarbonMergerRDD.scala:363) at org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:68) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.triggerCompaction(CarbonTableCompactor.scala:204) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.scanSegmentsAndSubmitJob(CarbonTableCompactor.scala:136) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.executeCompaction(CarbonTableCompactor.scala:85) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$$anon$2.run(CarbonDataRDDFactory.scala:180) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.startCompactionThreads(CarbonDataRDDFactory.scala:287) at org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.alterTableForCompaction(CarbonAlterTableCompactionCommand.scala:338) at org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.processData(CarbonAlterTableCompactionCommand.scala:202) at org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:148) at org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:145) at org.apache.spark.sql.execution.command.Auditable$class.runWithAudit(package.scala:104) at org.apache.spark.sql.execution.command.AtomicRunnableCommand.runWithAudit(package.scala:141) at org.apache.spark.sql.execution.command.AtomicRunnableCommand.run(package.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) at org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:91) at org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:90) at org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:136) at org.apache.spark.sql.CarbonSession.sql(CarbonSession.scala:88) |
Hi, concurrent load will not cause the problem and I've tried that months
ago. Seen from the log, it seems that the problem lies in Compation that automatically triggered after loading. To solve the problem, I think you can: 1. firstly turn off auto-compaction to increase loading performance, and you can fire concurrent loading here. 2. trigger compaction at some period for the performance of future query. Be sure not to fire multiple compaction concurrently which I'm not sure whether it has problems or not. But I think you can fire compaction along with the loading (which is not tested by me yet.) -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Thanks for your response.
I am running manual compaction, basically the flow are: 1. Multiple (4) threads that running the load data command (each load is 100k rows). I am loading data continuously up to 10,000 csv files. 2. For every 10 load data command, calling the compaction command in another thread. This is call along with data loading. I get another error in load data command if I call the 'CLEAN FILES FOR TABLE' command in another thread along the load data command. ERROR CarbonLoaderUtil:331 - Entry not found to update LoadMetadataDetails{loadStatus=Success, loadName='55', loadStartTime='1573627227495', segmentFile='55_1573627227495.segment'} From list :: [LoadMetadataDetails{loadStatus=Compacted, loadName='0', loadStartTime='1573627193399', segmentFile='0_1573627193399.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='0.1', loadStartTime='1573627205355', segmentFile='0.1_1573627205355.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='10.1', loadStartTime='1573627205355', segmentFile='10.1_1573627205355.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='20.1', loadStartTime='1573627216942', segmentFile='20.1_1573627216942.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='50', loadStartTime='1573627225090', segmentFile='50_1573627225090.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='51', loadStartTime='1573627225750', segmentFile='51_1573627225750.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='30.1', loadStartTime='1573627224848', segmentFile='30.1_1573627224848.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='52', loadStartTime='1573627226311', segmentFile='52_1573627226311.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='53', loadStartTime='1573627226913', segmentFile='53_1573627226913.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='40.1', loadStartTime='1573627224848', segmentFile='40.1_1573627224848.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='54', loadStartTime='1573627227492', segmentFile='54_1573627227492.segment'}, LoadMetadataDetails{loadStatus=Success, loadName='56', loadStartTime='1573627228051', segmentFile='56_1573627228051.segment'}, LoadMetadataDetails{loadStatus=Insert In Progress, loadName='57', loadStartTime='1573627228635', segmentFile='null'}] It seems that the segment 55 is added to tablestatus after CLEAN FILES command read all the segment from tablestatus, and when CLEAN FILES command complete, it does not write segment 55 into the tablestatus. Then, when segment 55 load complete, it is unable to update the tablestatus with that error. -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Free forum by Nabble | Edit this page |