Hi guys:
I run CarbonData(branch master ) + Spark 2.1.1 with on yarn-client mode, there is en error when i execute select sql, the details are as follows: My env: CarbonData(branch master, 2456 commits) + Spark 2.1.1, run on yarn-client mode; spark shell: /opt/spark2/bin/spark-shell --master yarn --deploy-mode client --files /opt/spark2/conf/log4j_all.properties#log4j.properties,/opt/spark2/conf/carbon.properties --driver-memory 6g --num-executors 6 --executor-memory 5g --executor-cores 1 --driver-library-path :/opt/cloudera/parcels/CDH/lib/hadoop/lib/native --jars /opt/spark2/carbonlib/carbondata_2.11-1.2.0-shade-hadoop2.6.0-cdh5.7.1.jar; carbon.properties: carbon.storelocation=hdfs://hdtcluster/carbon_store carbon.ddl.base.hdfs.url=hdfs://hdtcluster/carbon_base_path carbon.bad.records.action=FORCE carbon.badRecords.location=/opt/carbondata/badrecords carbon.use.local.dir=true carbon.use.multiple.temp.dir=true carbon.sort.file.buffer.size=20 carbon.graph.rowset.size=100000 carbon.number.of.cores.while.loading=6 carbon.sort.size=500000 carbon.enableXXHash=true carbon.number.of.cores.while.compacting=2 carbon.compaction.level.threshold=2,4 carbon.major.compaction.size=1024 carbon.enable.auto.load.merge=true carbon.number.of.cores=4 carbon.inmemory.record.size=120000 carbon.enable.quick.filter=false carbon.timestamp.format=yyyy-MM-dd HH:mm:ss carbon.date.format=yyyy-MM-dd carbon.lock.type=HDFSLOCK enable.unsafe.columnpage=true my code: import org.apache.spark.sql.SaveMode import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ sc.setLogLevel("DEBUG") val carbon = SparkSession.builder().appName("TestCarbonData").config(sc.getConf) .getOrCreateCarbonSession("hdfs://hdtcluster/carbon_store", "/opt/carbondata/carbon.metastore") carbon.conf.set("spark.sql.parquet.binaryAsString", true) val testParquet = carbon.read.parquet("/tmp/cp_hundred_million") testParquet.createOrReplaceTempView("test_distinct") val orderedCols = carbon.sql(""" select chan, acarea, cache, code, rt, ts, fcip, url, size, host, bsize, upsize, fvarf, fratio, ua, uabro, uabrov, uaos, uaptfm, uadvc, msecdl, refer, pdate, ptime, ftype from test_distinct """) println(orderedCols.count()) carbon.sql(""" | CREATE TABLE IF NOT EXISTS carbondata_hundred_million_pr1198 ( | chan string, | acarea string, | cache string, | code int, | rt string, | ts int, | fcip string, | url string, | size bigint, | host string, | bsize bigint, | upsize bigint, | fvarf string, | fratio int, | ua string, | uabro string, | uabrov string, | uaos string, | uaptfm string, | uadvc string, | msecdl bigint, | refer string, | pdate string, | ptime string, | ftype string | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='chan, acarea, cache, rt, ts, fcip, ua, uabro, uabrov, uaos, uaptfm, uadvc, refer, ftype', | 'NO_INVERTED_INDEX'='pdate, ptime', | 'TABLE_BLOCKSIZE'='512' | ) """.stripMargin) carbon.catalog.listDatabases.show(false) carbon.catalog.listTables.show(false) orderedCols.write .format("carbondata") .option("tableName", "carbondata_hundred_million_pr1198") .option("tempCSV", "false") .option("compress", "true") .option("single_pass", "true") .mode(SaveMode.Append) .save() carbon.sql(""" select count(1) from default.carbondata_hundred_million_pr1198 """).show(100) carbon.sql(""" SHOW SEGMENTS FOR TABLE default.carbondata_hundred_million_pr1198 limit 100 """).show data loading is successful, but when execute select sql, an error occurred: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#253L]) +- *BatchedScan CarbonDatasourceHadoopRelation [ Database name :default, Table name :carbondata_hundred_million_pr1198, Schema :Some(StructType(StructField(chan,StringType,true), StructField(acarea,StringType,true), StructField(cache,StringType,true), StructField(code,IntegerType,true), StructField(rt,StringType,true), StructField(ts,IntegerType,true), StructField(fcip,StringType,true), StructField(url,StringType,true), StructField(size,LongType,true), StructField(host,StringType,true), StructField(bsize,LongType,true), StructField(upsize,LongType,true), StructField(fvarf,StringType,true), StructField(fratio,IntegerType,true), StructField(ua,StringType,true), StructField(uabro,StringType,true), StructField(uabrov,StringType,true), StructField(uaos,StringType,true), StructField(uaptfm,StringType,true), StructField(uadvc,StringType,true), StructField(msecdl,LongType,true), StructField(refer,StringType,true), StructField(pdate,StringType,true), StructField(ptime,StringType,true), StructField(ftype,StringType,true))) ] default.carbondata_hundred_million_pr1198[] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:112) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818) at org.apache.spark.sql.Dataset.head(Dataset.scala:2127) at org.apache.spark.sql.Dataset.take(Dataset.scala:2342) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:638) at org.apache.spark.sql.Dataset.show(Dataset.scala:597) ... 53 elided Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field scala.collection.convert.Wrappers$SeqWrapper.underlying of type scala.collection.Seq in instance of scala.collection.convert.Wrappers$SeqWrapper at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.carbondata.hadoop.util.ObjectSerializationUtil.convertStringToObject(ObjectSerializationUtil.java:99) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getTableInfo(CarbonTableInputFormat.java:124) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getOrCreateCarbonTable(CarbonTableInputFormat.java:134) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:268) at org.apache.carbondata.spark.rdd.CarbonScanRDD.getPartitions(CarbonScanRDD.scala:82) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:261) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121) at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 85 more When I ran CarbonData(branch master, 2445 commits) + Spark 2.1.1, it was successful. Please help me.? Thanks. |
This problem has been resolved by jacky, please see pr: https://github.com/apache/carbondata/pull/1211.
|
Free forum by Nabble | Edit this page |