Hi team,
I'm trying to save spark dataframe to carbondata file. I see the example in your wiki option("tableName", "carbontable"). Does that mean I have to create a carbondata table first and then save data into the table? Can I save it directly without creating the carbondata table? the code is df.write.format("carbondata").mode(SaveMode.Append).save("hdfs:///user/****/data.carbon") BTW, do you have the formal api doc? Thanks, Lionel |
Administrator
|
Hi Lionel
Don't need to create table first, please find the example code in ExampleUtils.scala df.write .format("carbondata") .option("tableName", tableName) .option("compress", "true") .option("useKettle", "false") .mode(mode) .save() Preparing API docs is in progress. Regards Liang 2016-11-28 20:24 GMT+08:00 Lu Cao <[hidden email]>: > Hi team, > I'm trying to save spark dataframe to carbondata file. I see the example in > your wiki > option("tableName", "carbontable"). Does that mean I have to create a > carbondata table first and then save data into the table? Can I save it > directly without creating the carbondata table? > > the code is > df.write.format("carbondata").mode(SaveMode.Append).save(" > hdfs:///user/****/data.carbon") > > BTW, do you have the formal api doc? > > Thanks, > Lionel > -- Regards Liang |
Thank you for the response Liang. I think I have followed the example but
it still returns error: Data loading failed. table not found: default.carbontest attached my code below: I read data from a hive table with HiveContext and convert it to CarbonContext then generate the df and save to hdfs. I'm not sure whether it's correct or not when I generate the dataframe in sc.parallelize(sc.Files, 25) Do you have any other mothod we can use to generate DF? object SparkConvert { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CarbonTest") val sc = new SparkContext(conf) val path = "hdfs:///user/appuser/lucao/CarbonTest_001.carbon" val hqlContext = new HiveContext(sc) val df = hqlContext.sql("select * from default.test_data_all") println("the count is:" + df.count()) val cc = createCarbonContext(df.sqlContext.sparkContext, path) writeDataFrame(cc, "CarbonTest", SaveMode.Append) } def createCarbonContext(sc : SparkContext, storePath : String): CarbonContext = { val cc = new CarbonContext(sc, storePath) cc } def writeDataFrame(cc : CarbonContext, tableName : String, mode : SaveMode) : Unit = { import cc.implicits._ val sc = cc.sparkContext val df = sc.parallelize(sc.files, 25).toDF(“col1”,”col2”,”col3”..."coln") df.write .format("carbondata") .option("tableName", tableName) .option("compress", "true") .mode(mode) .save() } } |
Try Code like this:
``` if (cc.tableNames().filter(f => f == _cfg.get("tableName").get).size == 0) { df.sqlContext.sql(s"DROP TABLE IF EXISTS ${_cfg.get("tableName").get}") writer.options(_cfg).mode(SaveMode.Overwrite).format(_format).save() } else { writer.options(_cfg).mode(SaveMode.valueOf(_mode)).format(_format).save() } ``` Only when you have table created then you can use SaveMode.Append otherwise you should use SaveMode.Overwrite to make CarbonData create table for you. On Tue, Nov 29, 2016 at 5:56 PM, Lu Cao <[hidden email]> wrote: > Thank you for the response Liang. I think I have followed the example but > it still returns error: > Data loading failed. table not found: default.carbontest > attached my code below: I read data from a hive table with HiveContext and > convert it to CarbonContext then generate the df and save to hdfs. I'm not > sure whether it's correct or not when I generate the dataframe in > sc.parallelize(sc.Files, > 25) Do you have any other mothod we can use to generate DF? > > object SparkConvert { > > def main(args: Array[String]): Unit = { > > val conf = new SparkConf().setAppName("CarbonTest") > > val sc = new SparkContext(conf) > > val path = "hdfs:///user/appuser/lucao/CarbonTest_001.carbon" > > val hqlContext = new HiveContext(sc) > > val df = hqlContext.sql("select * from default.test_data_all") > > println("the count is:" + df.count()) > > val cc = createCarbonContext(df.sqlContext.sparkContext, path) > > writeDataFrame(cc, "CarbonTest", SaveMode.Append) > > > > } > > > > def createCarbonContext(sc : SparkContext, storePath : String): > CarbonContext = { > > val cc = new CarbonContext(sc, storePath) > > cc > > } > > > > def writeDataFrame(cc : CarbonContext, tableName : String, mode : > SaveMode) : Unit = { > > import cc.implicits._ > > val sc = cc.sparkContext > > val df = sc.parallelize(sc.files, > 25).toDF(“col1”,”col2”,”col3”..."coln") > > df.write > > .format("carbondata") > > .option("tableName", tableName) > > .option("compress", "true") > > .mode(mode) > > .save() > > } > > > > } > -- Best Regards _______________________________________________________________ 开阔视野 专注开发 WilliamZhu 祝海林 [hidden email] 产品事业部-基础平台-搜索&数据挖掘 手机:18601315052 MSN:[hidden email] 微博:@PrinceCharmingJ http://weibo.com/PrinceCharmingJ 地址:北京市朝阳区广顺北大街33号院1号楼福码大厦B座12层 _______________________________________________________________ http://www.csdn.net You're the One 全球最大中文IT技术社区 一切由你开始 http://www.iteye.net 程序员深度交流社区 |
Free forum by Nabble | Edit this page |