carbon data

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

carbon data

lionel061201
Hi team,
I'm trying to save spark dataframe to carbondata file. I see the example in
your wiki
option("tableName", "carbontable"). Does that mean I have to create a
carbondata table first and then save data into the table? Can I save it
directly without creating the carbondata table?

the code is
df.write.format("carbondata").mode(SaveMode.Append).save("hdfs:///user/****/data.carbon")

BTW, do you have the formal api doc?

Thanks,
Lionel
Reply | Threaded
Open this post in threaded view
|

Re: carbon data

Liang Chen
Administrator
Hi Lionel

Don't need to create table first, please find the example code in
ExampleUtils.scala

df.write
    .format("carbondata")
    .option("tableName", tableName)
    .option("compress", "true")
    .option("useKettle", "false")
    .mode(mode)
    .save()

Preparing API docs is in progress.

Regards
Liang
2016-11-28 20:24 GMT+08:00 Lu Cao <[hidden email]>:

> Hi team,
> I'm trying to save spark dataframe to carbondata file. I see the example in
> your wiki
> option("tableName", "carbontable"). Does that mean I have to create a
> carbondata table first and then save data into the table? Can I save it
> directly without creating the carbondata table?
>
> the code is
> df.write.format("carbondata").mode(SaveMode.Append).save("
> hdfs:///user/****/data.carbon")
>
> BTW, do you have the formal api doc?
>
> Thanks,
> Lionel
>



--
Regards
Liang
Reply | Threaded
Open this post in threaded view
|

Re: carbon data

lionel061201
Thank you for the response Liang. I think I have followed the example but
it still returns error:
       Data loading failed. table not found: default.carbontest
attached my code below: I read data from a hive table with HiveContext and
convert it to CarbonContext then generate the df and save to hdfs. I'm not
sure whether it's correct or not when I generate the dataframe in
sc.parallelize(sc.Files,
25) Do you have any other mothod we can use to generate DF?

object SparkConvert {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CarbonTest")

    val sc = new SparkContext(conf)

    val path = "hdfs:///user/appuser/lucao/CarbonTest_001.carbon"

    val hqlContext = new HiveContext(sc)

    val df = hqlContext.sql("select * from default.test_data_all")

    println("the count is:" + df.count())

    val cc = createCarbonContext(df.sqlContext.sparkContext, path)

    writeDataFrame(cc, "CarbonTest", SaveMode.Append)



  }



  def createCarbonContext(sc : SparkContext, storePath : String):
CarbonContext = {

    val cc = new CarbonContext(sc, storePath)

    cc

  }



  def writeDataFrame(cc : CarbonContext, tableName : String, mode :
SaveMode) : Unit = {

    import cc.implicits._

    val sc = cc.sparkContext

    val df = sc.parallelize(sc.files,
25).toDF(“col1”,”col2”,”col3”..."coln")

    df.write

      .format("carbondata")

      .option("tableName", tableName)

      .option("compress", "true")

      .mode(mode)

      .save()

  }



}
Reply | Threaded
Open this post in threaded view
|

Re: carbon data

ZhuWilliam
Try Code like this:
```
if (cc.tableNames().filter(f => f == _cfg.get("tableName").get).size == 0) {
          df.sqlContext.sql(s"DROP TABLE IF EXISTS
${_cfg.get("tableName").get}")

writer.options(_cfg).mode(SaveMode.Overwrite).format(_format).save()
        } else {

writer.options(_cfg).mode(SaveMode.valueOf(_mode)).format(_format).save()
}
```
Only when you have table created then you can use SaveMode.Append otherwise
you should use SaveMode.Overwrite to make CarbonData create table for you.

On Tue, Nov 29, 2016 at 5:56 PM, Lu Cao <[hidden email]> wrote:

> Thank you for the response Liang. I think I have followed the example but
> it still returns error:
>        Data loading failed. table not found: default.carbontest
> attached my code below: I read data from a hive table with HiveContext and
> convert it to CarbonContext then generate the df and save to hdfs. I'm not
> sure whether it's correct or not when I generate the dataframe in
> sc.parallelize(sc.Files,
> 25) Do you have any other mothod we can use to generate DF?
>
> object SparkConvert {
>
>   def main(args: Array[String]): Unit = {
>
>     val conf = new SparkConf().setAppName("CarbonTest")
>
>     val sc = new SparkContext(conf)
>
>     val path = "hdfs:///user/appuser/lucao/CarbonTest_001.carbon"
>
>     val hqlContext = new HiveContext(sc)
>
>     val df = hqlContext.sql("select * from default.test_data_all")
>
>     println("the count is:" + df.count())
>
>     val cc = createCarbonContext(df.sqlContext.sparkContext, path)
>
>     writeDataFrame(cc, "CarbonTest", SaveMode.Append)
>
>
>
>   }
>
>
>
>   def createCarbonContext(sc : SparkContext, storePath : String):
> CarbonContext = {
>
>     val cc = new CarbonContext(sc, storePath)
>
>     cc
>
>   }
>
>
>
>   def writeDataFrame(cc : CarbonContext, tableName : String, mode :
> SaveMode) : Unit = {
>
>     import cc.implicits._
>
>     val sc = cc.sparkContext
>
>     val df = sc.parallelize(sc.files,
> 25).toDF(“col1”,”col2”,”col3”..."coln")
>
>     df.write
>
>       .format("carbondata")
>
>       .option("tableName", tableName)
>
>       .option("compress", "true")
>
>       .mode(mode)
>
>       .save()
>
>   }
>
>
>
> }
>



--
Best Regards
_______________________________________________________________
开阔视野      专注开发
WilliamZhu   祝海林      [hidden email]
产品事业部-基础平台-搜索&数据挖掘
手机:18601315052
MSN:[hidden email]
微博:@PrinceCharmingJ  http://weibo.com/PrinceCharmingJ
地址:北京市朝阳区广顺北大街33号院1号楼福码大厦B座12层
_______________________________________________________________
http://www.csdn.net                              You're the One
全球最大中文IT技术社区                               一切由你开始

http://www.iteye.net
程序员深度交流社区