[ https://issues.apache.org/jira/browse/CARBONDATA-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geetika Gupta reassigned CARBONDATA-2354: ----------------------------------------- Assignee: Geetika Gupta > Getting Error while executing Streaming example from the streaming guide documentation > -------------------------------------------------------------------------------------- > > Key: CARBONDATA-2354 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2354 > Project: CarbonData > Issue Type: Bug > Components: examples > Affects Versions: 1.4.0 > Environment: spark 2.1, spark 2.2 > Reporter: Vandana Yadav > Assignee: Geetika Gupta > Priority: Major > > Getting Error while executing Streaming example from the streaming guide documentation > Steps to reproduce: > 1) Run spark shell with latest build using: > ./spark-shell --jars /home/knoldus/Desktop/CARBONDATA/carbondata/assembly/target/scala-2.11-carbondata-1.4.0-SNAPSHOT-bin-spark2.1.0-hadoop2.7.2.jar > Execute the example: > :paste > // Entering paste mode (ctrl-D to finish) > import java.io.File > import org.apache.spark.sql.\{CarbonEnv, SparkSession} > import org.apache.spark.sql.CarbonSession._ > import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.util.path.CarbonStorePath > > val warehouse = new File("./warehouse").getCanonicalPath > val metastore = new File("./metastore").getCanonicalPath > > val spark = SparkSession > .builder() > .master("local") > .appName("StreamExample") > .config("spark.sql.warehouse.dir", warehouse) > .getOrCreateCarbonSession(warehouse, metastore) > spark.sparkContext.setLogLevel("ERROR") > // drop table if exists previously > spark.sql(s"DROP TABLE IF EXISTS carbon_table") > // Create target carbon table and populate with initial data > spark.sql( > s""" > | CREATE TABLE carbon_table ( > | col1 INT, > | col2 STRING > | ) > | STORED BY 'carbondata' > | TBLPROPERTIES('streaming'='true')""".stripMargin) > val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark) > val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > > // batch load > var qry: StreamingQuery = null > val readSocketDF = spark.readStream > .format("socket") > .option("host", "localhost") > .option("port", 9099) > .load() > // Write data from socket stream to carbondata file > qry = readSocketDF.writeStream > .format("carbondata") > .trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation", tablePath.getStreamingCheckpointDir) > .option("dbName", "default") > .option("tableName", "carbon_table") > .start() > // start new thread to show data > new Thread() { > override def run(): Unit = { > do { > spark.sql("select * from carbon_table").show(false) > Thread.sleep(10000) > } while (true) > } > }.start() > qry.awaitTermination() > > Expected Result: it should be executed successfully. > Actual Result: > // Exiting paste mode, now interpreting. > <console>:27: error: object CarbonStorePath is not a member of package org.apache.carbondata.core.util.path > import org.apache.carbondata.core.util.path.CarbonStorePath > ^ > <console>:54: error: not found: value CarbonStorePath > val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |