[ https://issues.apache.org/jira/browse/CARBONDATA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305133#comment-16305133 ] Bhavya Aggarwal commented on CARBONDATA-1790: --------------------------------------------- This bug is an invalid bug as the steps give violates the current design, in current design we can only have a single stream for a table if we are opening up sockets multiple time the offset error will definitely come as sockets are not replayable. This is default spark behavior, if you want to test it you can open a file stream and then try to do batch load in between. I have written a small code to verify whether batch loads work with the stream and the code is given below. You can keep moving different files into the directory from which you are creating the stream in this case it is /home/bhavya/stream. Please note that you should move the file to this folder after the stream is started. Both the streaming data and batch data is consumed concurrently without any issue. import java.io.{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} // scalastyle:off println object StreamExample { def main(args: Array[String]) { // setup paths val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" // // val storeLocation = s"hdfs://localhost:54311/stream" val warehouse = s"$rootPath/examples/spark2/target/warehouse" val metastoredb = s"$rootPath/examples/spark2/target" val streamTableName = s"stream_table_2" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val spark = SparkSession .builder() .master("local") .appName("StreamExample") .config("spark.sql.warehouse.dir", warehouse) .getOrCreateCarbonSession(storeLocation, metastoredb) spark.sparkContext.setLogLevel("ERROR") // drop table if exists previously spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") //Create table spark.sql( s""" | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, | city STRING, | salary FLOAT | ) | STORED BY 'carbondata' | TBLPROPERTIES( | 'streaming'='true', 'sort_columns'='name') | """.stripMargin) val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val thread1 = startStreaming(spark, tablePath) val thread3 = showTableCount(spark, streamTableName) Thread.sleep(15000) val path = s"/home/bhavya/carbonData/test.csv" spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' | INTO TABLE $streamTableName | OPTIONS('HEADER'='true') """.stripMargin) Thread.sleep(15000) spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' | INTO TABLE $streamTableName | OPTIONS('HEADER'='true') """.stripMargin) System.out.println("type enter to interrupt streaming") System.in.read() thread1.interrupt() thread3.interrupt() } spark.sql(s"select count(*) from ${ streamTableName }").show() spark.sql(s"select * from ${ streamTableName }").show(500, truncate = false) spark.stop() System.out.println("streaming finished") } def showTableCount(spark: SparkSession, tableName: String): Thread = { val thread = new Thread() { override def run(): Unit = { for (_ <- 0 to 100) { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } } } thread.start() thread } def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { val thread = new Thread() { override def run(): Unit = { var qry: StreamingQuery = null try { val userSchema = StructType( Array(StructField("id", StringType), StructField("name", StringType), StructField("city", StringType), StructField("salary", StringType))) val readSocketDF = spark.readStream .format("csv") .option("path", "/home/bhavya/stream") .option("sep",",") .schema(userSchema) .load() import spark.implicits._ // Write data from socket stream to carbondata file qry = readSocketDF.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2) + "," + x.get(3) }.writeStream .format("carbondata") .trigger(ProcessingTime("1 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("dbName", "default") .option("tableName", "stream_table_2") .start() qry.awaitTermination() } catch { case ex => ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } > (Carbon1.3.0 - Streaming) Data load in Stream Segment fails if batch load is performed in between the streaming > --------------------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-1790 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1790 > Project: CarbonData > Issue Type: Bug > Components: data-query > Affects Versions: 1.3.0 > Environment: 3 node ant cluster > Reporter: Ramakrishna S > Assignee: Bhavya Aggarwal > Labels: DFX > > Steps : > 1. Create a streaming table and do a batch load > 2. Set up the Streaming , so that it does streaming in chunk of 1000 records 20 times > 3. Do another batch load on the table > 4. Do one more time streaming > +-------------+------------+--------------------------+--------------------------+--------------+------------+--+ > | Segment Id | Status | Load Start Time | Load End Time | File Format | Merged To | > +-------------+------------+--------------------------+--------------------------+--------------+------------+--+ > | 2 | Success | 2017-11-21 21:42:36.77 | 2017-11-21 21:42:40.396 | COLUMNAR_V3 | NA | > | 1 | Streaming | 2017-11-21 21:40:46.2 | NULL | ROW_V1 | NA | > | 0 | Success | 2017-11-21 21:40:39.782 | 2017-11-21 21:40:43.168 | COLUMNAR_V3 | NA | > +-------------+------------+--------------------------+--------------------------+--------------+------------+--+ > *+Expected:+* Data should be loaded > *+Actual+* : Data load fiails > 1. One addition offset file is created(marked in bold) > -rw-r--r-- 2 root users 62 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/0 > -rw-r--r-- 2 root users 63 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/1 > -rw-r--r-- 2 root users 63 2017-11-21 21:42 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/10 > -rw-r--r-- 2 root users 63 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/2 > -rw-r--r-- 2 root users 63 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/3 > -rw-r--r-- 2 root users 64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/4 > -rw-r--r-- 2 root users 64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/5 > -rw-r--r-- 2 root users 64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/6 > -rw-r--r-- 2 root users 64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/7 > -rw-r--r-- 2 root users 64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/8 > *-rw-r--r-- 2 root users 63 2017-11-21 21:42 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/9* > 2. Following error thrown: > === Streaming Query === > Identifier: [id = 3a5334bc-d471-4676-b6ce-f21105d491d1, runId = b2be9f97-8141-46be-89db-9a0f98d13369] > Current Offsets: {org.apache.spark.sql.execution.streaming.TextSocketSource@14c45193: 1000} > Current State: ACTIVE > Thread State: RUNNABLE > Logical Plan: > org.apache.spark.sql.execution.streaming.TextSocketSource@14c45193 > at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) > Caused by: java.lang.RuntimeException: Offsets committed out of order: 20019 followed by 1000 > at scala.sys.package$.error(package.scala:27) > at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:151) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2$$anonfun$apply$mcV$sp$4.apply(StreamExecution.scala:421) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2$$anonfun$apply$mcV$sp$4.apply(StreamExecution.scala:420) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply$mcV$sp(StreamExecution.scala:420) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply(StreamExecution.scala:404) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply(StreamExecution.scala:404) > at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:404) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:250) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) > at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) > at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) > ... 1 more > Done reading and writing streaming data > Socket closed -- This message was sent by Atlassian JIRA (v6.4.14#64029) |
Free forum by Nabble | Edit this page |