[ https://issues.apache.org/jira/browse/CARBONDATA-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geetika Gupta updated CARBONDATA-2198: -------------------------------------- Description: Steps to reproduce: /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * [http://www.apache.org/licenses/LICENSE-2.0] * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.carbondata.examples import java.io.\{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.\{CarbonEnv, SparkSession} import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath} // scalastyle:off println object CarbonStructuredStreamingExample { def main(args: Array[String]) { // setup paths val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" val metastoredb = s"$rootPath/examples/spark2/target" val streamTableName = s"stream_table" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val spark = SparkSession .builder() .master("local") .appName("CarbonStructuredStreamingExample") .config("spark.sql.warehouse.dir", warehouse) .getOrCreateCarbonSession(storeLocation, metastoredb) spark.sparkContext.setLogLevel("ERROR") val requireCreateTable = true val useComplexDataType = false if (requireCreateTable) { // drop table if exists previously spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") // Create target carbon table and populate with initial data if (useComplexDataType) { spark.sql( s""" |CREATE TABLE ${ streamTableName }(| |id INT,| |name STRING,| |city STRING,| |salary FLOAT,| |file struct<school:array<string>, age:int>| |)| |STORED BY 'carbondata'| |TBLPROPERTIES(| |'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')| |""".stripMargin) } else { spark.sql( s"""| |CREATE TABLE ${ streamTableName }(| |id INT,| |name STRING,| |city STRING,| |salary FLOAT| |)| |STORED BY 'carbondata'| |TBLPROPERTIES(| |'streaming'='true', 'sort_columns'='name')| |""".stripMargin) }| val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // streaming ingest val serverSocket = new ServerSocket(7071) val thread1 = startStreaming(spark, tablePath) val thread2 = writeSocket(serverSocket) System.out.println("type enter to interrupt streaming") System.in.read() thread1.interrupt() thread2.interrupt() serverSocket.close() } spark.sql(s"select * from $streamTableName").show spark.stop() System.out.println("streaming finished") } def showTableCount(spark: SparkSession, tableName: String): Thread = { val thread = new Thread() { override def run(): Unit = { for (_ <- 0 to 1000) { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } } } thread.start() thread } def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { val thread = new Thread() { override def run(): Unit = { var qry: StreamingQuery = null try { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() } catch { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } def writeSocket(serverSocket: ServerSocket): Thread = { val thread = new Thread() { override def run(): Unit = { // wait for client to connection request and accept val clientSocket = serverSocket.accept() val socketWriter = new PrintWriter(clientSocket.getOutputStream()) var index = 0 for (_ <- 1 to 1000) { // write 5 records per iteration for (_ <- 0 to 1000) { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) } socketWriter.flush() Thread.sleep(1000) } socketWriter.close() System.out.println("Socket closed") } } thread.start() thread } } // scalastyle:on println In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException. Here are the logs: 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR Utils: Aborting task java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted. 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) ... 8 more was: Steps to reproduce: /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.carbondata.examples import java.io.\{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.\{CarbonEnv, SparkSession} import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath} // scalastyle:off println object CarbonStructuredStreamingExample { def main(args: Array[String]) { // setup paths val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" val metastoredb = s"$rootPath/examples/spark2/target" val streamTableName = s"stream_table" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val spark = SparkSession .builder() .master("local") .appName("CarbonStructuredStreamingExample") .config("spark.sql.warehouse.dir", warehouse) .getOrCreateCarbonSession(storeLocation, metastoredb) spark.sparkContext.setLogLevel("ERROR") val requireCreateTable = true val useComplexDataType = false if (requireCreateTable) { // drop table if exists previously spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") // Create target carbon table and populate with initial data if (useComplexDataType) { spark.sql( s""" | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, | city STRING, | salary FLOAT, | file struct<school:array<string>, age:int> | ) | STORED BY 'carbondata' | TBLPROPERTIES( | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') | """.stripMargin) } else { spark.sql( s""" | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, | city STRING, | salary FLOAT | ) | STORED BY 'carbondata' | TBLPROPERTIES( | 'streaming'='true', 'sort_columns'='name') | """.stripMargin) } val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // streaming ingest val serverSocket = new ServerSocket(7071) val thread1 = startStreaming(spark, tablePath) val thread2 = writeSocket(serverSocket) System.out.println("type enter to interrupt streaming") System.in.read() thread1.interrupt() thread2.interrupt() serverSocket.close() } spark.sql(s"select * from $streamTableName").show spark.stop() System.out.println("streaming finished") } def showTableCount(spark: SparkSession, tableName: String): Thread = { val thread = new Thread() { override def run(): Unit = { for (_ <- 0 to 1000) { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } } } thread.start() thread } def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { val thread = new Thread() { override def run(): Unit = { var qry: StreamingQuery = null try { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() } catch { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } def writeSocket(serverSocket: ServerSocket): Thread = { val thread = new Thread() { override def run(): Unit = { // wait for client to connection request and accept val clientSocket = serverSocket.accept() val socketWriter = new PrintWriter(clientSocket.getOutputStream()) var index = 0 for (_ <- 1 to 1000) { // write 5 records per iteration for (_ <- 0 to 1000) { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) } socketWriter.flush() Thread.sleep(1000) } socketWriter.close() System.out.println("Socket closed") } } thread.start() thread } } // scalastyle:on println In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException. Here are the logs: 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR Utils: Aborting task java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted. 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) ... 8 more *strong text* > Streaming data to a table with bad_records_action as IGNORE throws ClassCastException > ------------------------------------------------------------------------------------- > > Key: CARBONDATA-2198 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2198 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.4.0 > Reporter: Geetika Gupta > Priority: Minor > > Steps to reproduce: > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * [http://www.apache.org/licenses/LICENSE-2.0] > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.carbondata.examples > import java.io.\{File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql.\{CarbonEnv, SparkSession} > import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath} > // scalastyle:off println > object CarbonStructuredStreamingExample { > def main(args: Array[String]) { > // setup paths > val rootPath = new File(this.getClass.getResource("/").getPath > + "../../../..").getCanonicalPath > val storeLocation = s"$rootPath/examples/spark2/target/store" > val warehouse = s"$rootPath/examples/spark2/target/warehouse" > val metastoredb = s"$rootPath/examples/spark2/target" > val streamTableName = s"stream_table" > CarbonProperties.getInstance() > .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val spark = SparkSession > .builder() > .master("local") > .appName("CarbonStructuredStreamingExample") > .config("spark.sql.warehouse.dir", warehouse) > .getOrCreateCarbonSession(storeLocation, metastoredb) > spark.sparkContext.setLogLevel("ERROR") > val requireCreateTable = true > val useComplexDataType = false > if (requireCreateTable) { > // drop table if exists previously > spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") > // Create target carbon table and populate with initial data > if (useComplexDataType) { > spark.sql( > s""" > |CREATE TABLE ${ streamTableName }(| > |id INT,| > |name STRING,| > |city STRING,| > |salary FLOAT,| > |file struct<school:array<string>, age:int>| > |)| > |STORED BY 'carbondata'| > |TBLPROPERTIES(| > |'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')| > |""".stripMargin) > } else { > spark.sql( > s"""| > |CREATE TABLE ${ streamTableName }(| > |id INT,| > |name STRING,| > |city STRING,| > |salary FLOAT| > |)| > |STORED BY 'carbondata'| > |TBLPROPERTIES(| > |'streaming'='true', 'sort_columns'='name')| > |""".stripMargin) > }| > val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) > val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > // streaming ingest > val serverSocket = new ServerSocket(7071) > val thread1 = startStreaming(spark, tablePath) > val thread2 = writeSocket(serverSocket) > System.out.println("type enter to interrupt streaming") > System.in.read() > thread1.interrupt() > thread2.interrupt() > serverSocket.close() > } > spark.sql(s"select * from $streamTableName").show > spark.stop() > System.out.println("streaming finished") > } > def showTableCount(spark: SparkSession, tableName: String): Thread = { > val thread = new Thread() { > override def run(): Unit = { > for (_ <- 0 to 1000) > { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } > } > } > thread.start() > thread > } > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { > val thread = new Thread() { > override def run(): Unit = { > var qry: StreamingQuery = null > try > { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() } > catch > { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } > finally > { qry.stop() } > } > } > thread.start() > thread > } > def writeSocket(serverSocket: ServerSocket): Thread = { > val thread = new Thread() { > override def run(): Unit = { > // wait for client to connection request and accept > val clientSocket = serverSocket.accept() > val socketWriter = new PrintWriter(clientSocket.getOutputStream()) > var index = 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 1000) > { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) } > socketWriter.flush() > Thread.sleep(1000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > } > // scalastyle:on println > In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException. > > Here are the logs: > 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0 > java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/02/23 16:09:50 ERROR Utils: Aborting task > java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted. > 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |