When I insert data into carbondata from one table, I should do as the
following: 1、select count(1) from table1 and then 2、insert into table table1 select * from table1 Why I should execute "select count(1) from table1" first? because the number of tasks are compute by carbondata, it is releated to how many executor hosts we have now! I don't think it is the right way. We should let spark to control the number of tasks. set the parameter "mapred.max.splits.size" is a common way to adjust the number of tasks. Even when I do the step 2, some tasks still failed, it will increase the insert time. So I sugguest that don't adjust the number of tasks, just use the default behavior of spark. And then if there are small files, add a fast merge job(merge data at blocket level, just as ) so we also need to set the default value of "carbon.number.of.cores.while.loading" to 1 -- Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi,
I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. If not, then what is the intention? Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. GLOBAL_SORT and NO_SORT should use spark default behavior LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting Regards, Jacky > 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: > > When I insert data into carbondata from one table, I should do as the > following: > 1、select count(1) from table1 > and then > 2、insert into table table1 select * from table1 > > Why I should execute "select count(1) from table1" first? > because the number of tasks are compute by carbondata, it is releated to how > many executor hosts we have now! > > I don't think it is the right way. We should let spark to control the number > of tasks. > set the parameter "mapred.max.splits.size" is a common way to adjust the > number of tasks. > > Even when I do the step 2, some tasks still failed, it will increase the > insert time. > > So I sugguest that don't adjust the number of tasks, just use the default > behavior of spark. > And then if there are small files, add a fast merge job(merge data at > blocket level, just as ) > > so we also need to set the default value of > "carbon.number.of.cores.while.loading" to 1 > > > > > > > -- > Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
Hi, Jacky
Because it can't take full advantage of my resources and failed randomly. I want to make the insert process more quickly and stable. Yes, you are right. we should also find out why the loading failed. I don't think LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism”, use spark default behavior is more resonable, because it is based on the datasize. Users don't need to adjust it when the data become larger. Best regards! Yuhai Cen 在2017年10月28日 12:21,Jacky Li<[hidden email]> 写道: Hi, I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. If not, then what is the intention? Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. GLOBAL_SORT and NO_SORT should use spark default behavior LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting Regards, Jacky > 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: > > When I insert data into carbondata from one table, I should do as the > following: > 1、select count(1) from table1 > and then > 2、insert into table table1 select * from table1 > > Why I should execute "select count(1) from table1" first? > because the number of tasks are compute by carbondata, it is releated to how > many executor hosts we have now! > > I don't think it is the right way. We should let spark to control the number > of tasks. > set the parameter "mapred.max.splits.size" is a common way to adjust the > number of tasks. > > Even when I do the step 2, some tasks still failed, it will increase the > insert time. > > So I sugguest that don't adjust the number of tasks, just use the default > behavior of spark. > And then if there are small files, add a fast merge job(merge data at > blocket level, just as ) > > so we also need to set the default value of > "carbon.number.of.cores.while.loading" to 1 > > > > > > > -- > Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ |
In reply to this post by Jacky Li
I think about this issue, and find actually there is an usability problem carbon can improve. The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. To solve this, following change should be done: 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. Regards, Jacky > 在 2017年10月28日,上午9:51,Jacky Li <[hidden email]> 写道: > > Hi, > > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. > If not, then what is the intention? > > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. > GLOBAL_SORT and NO_SORT should use spark default behavior > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting > > > Regards, > Jacky > >> 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: >> >> When I insert data into carbondata from one table, I should do as the >> following: >> 1、select count(1) from table1 >> and then >> 2、insert into table table1 select * from table1 >> >> Why I should execute "select count(1) from table1" first? >> because the number of tasks are compute by carbondata, it is releated to how >> many executor hosts we have now! >> >> I don't think it is the right way. We should let spark to control the number >> of tasks. >> set the parameter "mapred.max.splits.size" is a common way to adjust the >> number of tasks. >> >> Even when I do the step 2, some tasks still failed, it will increase the >> insert time. >> >> So I sugguest that don't adjust the number of tasks, just use the default >> behavior of spark. >> And then if there are small files, add a fast merge job(merge data at >> blocket level, just as ) >> >> so we also need to set the default value of >> "carbon.number.of.cores.while.loading" to 1 >> >> >> >> >> >> >> -- >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow!
I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger..... Best regards! Yuhai Cen 在2017年10月28日 16:11,Jacky Li<[hidden email]> 写道: I think about this issue, and find actually there is an usability problem carbon can improve. The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. To solve this, following change should be done: 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. Regards, Jacky > 在 2017年10月28日,上午9:51,Jacky Li <[hidden email]> 写道: > > Hi, > > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. > If not, then what is the intention? > > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. > GLOBAL_SORT and NO_SORT should use spark default behavior > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting > > > Regards, > Jacky > >> 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: >> >> When I insert data into carbondata from one table, I should do as the >> following: >> 1、select count(1) from table1 >> and then >> 2、insert into table table1 select * from table1 >> >> Why I should execute "select count(1) from table1" first? >> because the number of tasks are compute by carbondata, it is releated to how >> many executor hosts we have now! >> >> I don't think it is the right way. We should let spark to control the number >> of tasks. >> set the parameter "mapred.max.splits.size" is a common way to adjust the >> number of tasks. >> >> Even when I do the step 2, some tasks still failed, it will increase the >> insert time. >> >> So I sugguest that don't adjust the number of tasks, just use the default >> behavior of spark. >> And then if there are small files, add a fast merge job(merge data at >> blocket level, just as ) >> >> so we also need to set the default value of >> "carbon.number.of.cores.while.loading" to 1 >> >> >> >> >> >> >> -- >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
Hi, jacky
Do you know my pain? scala> | carbon.sql("insert overwrite table dm_test.dm_trd_wide_carbondata select * from dm.dm_trd_wide where dt = '2017-10-10' ").collect 17/10/30 12:19:27 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load request has been received for table dm_test.dm_trd_wide_carbondata [Stage 0:> (0 + 85) / 85]17/10/30 12:19:39 ERROR [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(143) -- main]: main load data frame failed org.apache.spark.SparkException: Job 0 cancelled as part of cancellation of all jobs at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1381) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1634) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1972) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1985) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1998) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2012) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:370) at org.apache.spark.rdd.RDD.collect(RDD.scala:943) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadDataFrame$1(CarbonDataRDDFactory.scala:761) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:922) at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046) at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754) at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651) at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48) at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50) at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52) at $line18.$read$$iw$$iw$$iw.<init>(<console>:54) at $line18.$read$$iw$$iw.<init>(<console>:56) at $line18.$read$$iw.<init>(<console>:58) at $line18.$read.<init>(<console>:60) at $line18.$read$.<init>(<console>:64) at $line18.$read$.<clinit>(<console>) at $line18.$eval$.$print$lzycompute(<console>:7) at $line18.$eval$.$print(<console>:6) at $line18.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:69) at org.apache.spark.repl.Main$.main(Main.scala:52) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17/10/30 12:19:39 ERROR [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(143) -- main]: main org.apache.spark.SparkException: Job 0 cancelled as part of cancellation of all jobs at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1381) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:726) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:726) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1634) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1972) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1985) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1998) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2012) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:370) at org.apache.spark.rdd.RDD.collect(RDD.scala:943) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadDataFrame$1(CarbonDataRDDFactory.scala:761) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:922) at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046) at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754) at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651) at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48) at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50) at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52) at $line18.$read$$iw$$iw$$iw.<init>(<console>:54) at $line18.$read$$iw$$iw.<init>(<console>:56) at $line18.$read$$iw.<init>(<console>:58) at $line18.$read.<init>(<console>:60) at $line18.$read$.<init>(<console>:64) at $line18.$read$.<clinit>(<console>) at $line18.$eval$.$print$lzycompute(<console>:7) at $line18.$eval$.$print(<console>:6) at $line18.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:69) at org.apache.spark.repl.Main$.main(Main.scala:52) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17/10/30 12:19:39 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load is failed for dm_test.dm_trd_wide_carbondata 17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_0 17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_1 17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_2 17/10/30 12:19:39 ERROR [org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile(141) -- main]: main Exception occurred:File does not exist: hdfs://bipcluster/user/master/carbon/store/dm_test/dm_trd_wide_carbondata/Fact/Part0/Segment_3 17/10/30 12:19:39 ERROR [org.apache.spark.sql.execution.command.LoadTable(143) -- main]: main java.lang.NullPointerException at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isDirectory(AbstractDFSCarbonFile.java:88) at org.apache.carbondata.core.util.CarbonUtil.deleteRecursive(CarbonUtil.java:364) at org.apache.carbondata.core.util.CarbonUtil.access$100(CarbonUtil.java:93) at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:326) at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:322) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.carbondata.core.util.CarbonUtil.deleteFoldersAndFiles(CarbonUtil.java:322) at org.apache.carbondata.spark.load.CarbonLoaderUtil.recordLoadMetadata(CarbonLoaderUtil.java:331) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.updateStatus$1(CarbonDataRDDFactory.scala:595) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:1078) at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046) at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754) at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651) at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:42) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:44) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48) at $line18.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50) at $line18.$read$$iw$$iw$$iw$$iw.<init>(<console>:52) at $line18.$read$$iw$$iw$$iw.<init>(<console>:54) at $line18.$read$$iw$$iw.<init>(<console>:56) at $line18.$read$$iw.<init>(<console>:58) at $line18.$read.<init>(<console>:60) at $line18.$read$.<init>(<console>:64) at $line18.$read$.<clinit>(<console>) at $line18.$eval$.$print$lzycompute(<console>:7) at $line18.$eval$.$print(<console>:6) at $line18.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:825) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:69) at org.apache.spark.repl.Main$.main(Main.scala:52) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:186) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:211) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17/10/30 12:19:39 AUDIT [org.apache.spark.sql.execution.command.LoadTable(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Dataload failure for dm_test.dm_trd_wide_carbondata. Please check the logs java.lang.NullPointerException at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isDirectory(AbstractDFSCarbonFile.java:88) at org.apache.carbondata.core.util.CarbonUtil.deleteRecursive(CarbonUtil.java:364) at org.apache.carbondata.core.util.CarbonUtil.access$100(CarbonUtil.java:93) at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:326) at org.apache.carbondata.core.util.CarbonUtil$2.run(CarbonUtil.java:322) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.carbondata.core.util.CarbonUtil.deleteFoldersAndFiles(CarbonUtil.java:322) at org.apache.carbondata.spark.load.CarbonLoaderUtil.recordLoadMetadata(CarbonLoaderUtil.java:331) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.updateStatus$1(CarbonDataRDDFactory.scala:595) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.loadCarbonData(CarbonDataRDDFactory.scala:1078) at org.apache.spark.sql.execution.command.LoadTable.processData(carbonTableSchema.scala:1046) at org.apache.spark.sql.execution.command.LoadTable.run(carbonTableSchema.scala:754) at org.apache.spark.sql.execution.command.LoadTableByInsert.processData(carbonTableSchema.scala:651) at org.apache.spark.sql.execution.command.LoadTableByInsert.run(carbonTableSchema.scala:637) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:180) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:619) ... 51 elided scala> carbon.sql("insert overwrite table dm_test.dm_trd_wide_carbondata select * from dm.dm_trd_wide where dt = '2017-10-10' ").collect 17/10/30 12:19:44 AUDIT [org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$(207) -- main]: [sh-hadoop-datanode-250-104.elenet.me][master][Thread-1]Data load request has been received for table dm_test.dm_trd_wide_carbondata [Stage 1:> (0 + 434) / 434] Best regards! Yuhai Cen 在2017年10月30日 12:17,岑玉海<[hidden email]> 写道: why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow! I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger..... Best regards! Yuhai Cen 在2017年10月28日 16:11,Jacky Li<[hidden email]> 写道: I think about this issue, and find actually there is an usability problem carbon can improve. The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. To solve this, following change should be done: 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. Regards, Jacky > 在 2017年10月28日,上午9:51,Jacky Li <[hidden email]> 写道: > > Hi, > > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. > If not, then what is the intention? > > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. > GLOBAL_SORT and NO_SORT should use spark default behavior > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting > > > Regards, > Jacky > >> 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: >> >> When I insert data into carbondata from one table, I should do as the >> following: >> 1、select count(1) from table1 >> and then >> 2、insert into table table1 select * from table1 >> >> Why I should execute "select count(1) from table1" first? >> because the number of tasks are compute by carbondata, it is releated to how >> many executor hosts we have now! >> >> I don't think it is the right way. We should let spark to control the number >> of tasks. >> set the parameter "mapred.max.splits.size" is a common way to adjust the >> number of tasks. >> >> Even when I do the step 2, some tasks still failed, it will increase the >> insert time. >> >> So I sugguest that don't adjust the number of tasks, just use the default >> behavior of spark. >> And then if there are small files, add a fast merge job(merge data at >> blocket level, just as ) >> >> so we also need to set the default value of >> "carbon.number.of.cores.while.loading" to 1 >> >> >> >> >> >> >> -- >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
In reply to this post by cenyuhai11
> 在 2017年10月30日,上午9:47,岑玉海 <[hidden email]> 写道: > > why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow! Yes I agree. What I am suggesting in the other mail is that carbon should take care of it if the user does not set “carbon.number.of.cores.while.loading”. 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. I think it can solve your problem after this change. > I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger..... > > Best regards! > Yuhai Cen > > 在2017年10月28日 16:11,Jacky Li<[hidden email]> <mailto:[hidden email]> 写道: > > I think about this issue, and find actually there is an usability problem carbon can improve. > > The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. > > To solve this, following change should be done: > 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. > 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. > > Regards, > Jacky > > > > 在 2017年10月28日,上午9:51,Jacky Li <[hidden email]> 写道: > > > > Hi, > > > > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. > > If not, then what is the intention? > > > > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. > > GLOBAL_SORT and NO_SORT should use spark default behavior > > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting > > > > > > Regards, > > Jacky > > > >> 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: > >> > >> When I insert data into carbondata from one table, I should do as the > >> following: > >> 1、select count(1) from table1 > >> and then > >> 2、insert into table table1 select * from table1 > >> > >> Why I should execute "select count(1) from table1" first? > >> because the number of tasks are compute by carbondata, it is releated to how > >> many executor hosts we have now! > >> > >> I don't think it is the right way. We should let spark to control the number > >> of tasks. > >> set the parameter "mapred.max.splits.size" is a common way to adjust the > >> number of tasks. > >> > >> Even when I do the step 2, some tasks still failed, it will increase the > >> insert time. > >> > >> So I sugguest that don't adjust the number of tasks, just use the default > >> behavior of spark. > >> And then if there are small files, add a fast merge job(merge data at > >> blocket level, just as ) > >> > >> so we also need to set the default value of > >> "carbon.number.of.cores.while.loading" to 1 > >> > >> > >> > >> > >> > >> > >> -- > >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > > > |
this still can not resolve my problem. The number of tasks are not stable. sometimes only 10 tasks... Can 10 tasks work? I think maybe it can, but it will take a long time...
Best regards! Yuhai Cen 在2017年10月30日 13:21,Jacky Li<[hidden email]> 写道: 在 2017年10月30日,上午9:47,岑玉海 <[hidden email]> 写道: why user need to set "carbon.number.of.cores.while.loading", because the loading process is too slow. The key point is slow! Yes I agree. What I am suggesting in the other mail is that carbon should take care of it if the user does not set “carbon.number.of.cores.while.loading”. 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. I think it can solve your problem after this change. I also have a problem that I should increase the value of "spark.dynamicAllocation.maxExecutors" when the data become larger..... Best regards! Yuhai Cen 在2017年10月28日 16:11,Jacky Li<[hidden email]> 写道: I think about this issue, and find actually there is an usability problem carbon can improve. The problem is that, currently user is forced to set “carbon.number.of.cores.while.loading” carbon property before loading, this creates overhead for the user and usability is not good. To solve this, following change should be done: 1. Carbon should take the minimal value of “carbon.number.of.cores.while.loading” property and “spark.executor.cores” in spark conf, instead of taking “carbon.number.of.cores.while.loading” directly. 2. The default value of “carbon.number.of.cores.while.loading” should be big, so that carbon has higher chance to take “spark.executor.cores” as the loading cores. The current default value is 2 which I think is too small. Regards, Jacky > 在 2017年10月28日,上午9:51,Jacky Li <[hidden email]> 写道: > > Hi, > > I am not getting the intention behind this proposal. Is it because of the loading failure? If yes, we should find out why the loading failed. > If not, then what is the intention? > > Actually I think the “carbon.number.of.cores.while.loading” property should be marked as obsolete. > GLOBAL_SORT and NO_SORT should use spark default behavior > LOCAL_SORT and BATCH_SORT should use “sparkSession.sparkContext.defaultParallism” as the cores to do local sorting > > > Regards, > Jacky > >> 在 2017年10月27日,上午8:43,cenyuhai11 <[hidden email]> 写道: >> >> When I insert data into carbondata from one table, I should do as the >> following: >> 1、select count(1) from table1 >> and then >> 2、insert into table table1 select * from table1 >> >> Why I should execute "select count(1) from table1" first? >> because the number of tasks are compute by carbondata, it is releated to how >> many executor hosts we have now! >> >> I don't think it is the right way. We should let spark to control the number >> of tasks. >> set the parameter "mapred.max.splits.size" is a common way to adjust the >> number of tasks. >> >> Even when I do the step 2, some tasks still failed, it will increase the >> insert time. >> >> So I sugguest that don't adjust the number of tasks, just use the default >> behavior of spark. >> And then if there are small files, add a fast merge job(merge data at >> blocket level, just as ) >> >> so we also need to set the default value of >> "carbon.number.of.cores.while.loading" to 1 >> >> >> >> >> >> >> -- >> Sent from: http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ > |
Free forum by Nabble | Edit this page |