Geetika Gupta created CARBONDATA-2198:
----------------------------------------- Summary: Streaming data to a table with bad_records_action as IGNORE throws ClassCastException Key: CARBONDATA-2198 URL: https://issues.apache.org/jira/browse/CARBONDATA-2198 Project: CarbonData Issue Type: Bug Components: data-load Affects Versions: 1.4.0 Reporter: Geetika Gupta Steps to reproduce: /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.carbondata.examples import java.io.\{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.\{CarbonEnv, SparkSession} import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath} // scalastyle:off println object CarbonStructuredStreamingExample { def main(args: Array[String]) { // setup paths val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" val metastoredb = s"$rootPath/examples/spark2/target" val streamTableName = s"stream_table" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val spark = SparkSession .builder() .master("local") .appName("CarbonStructuredStreamingExample") .config("spark.sql.warehouse.dir", warehouse) .getOrCreateCarbonSession(storeLocation, metastoredb) spark.sparkContext.setLogLevel("ERROR") val requireCreateTable = true val useComplexDataType = false if (requireCreateTable) { // drop table if exists previously spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") // Create target carbon table and populate with initial data if (useComplexDataType) { spark.sql( s""" | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, | city STRING, | salary FLOAT, | file struct<school:array<string>, age:int> | ) | STORED BY 'carbondata' | TBLPROPERTIES( | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') | """.stripMargin) } else { spark.sql( s""" | CREATE TABLE ${ streamTableName }( | id INT, | name STRING, | city STRING, | salary FLOAT | ) | STORED BY 'carbondata' | TBLPROPERTIES( | 'streaming'='true', 'sort_columns'='name') | """.stripMargin) } val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // streaming ingest val serverSocket = new ServerSocket(7071) val thread1 = startStreaming(spark, tablePath) val thread2 = writeSocket(serverSocket) System.out.println("type enter to interrupt streaming") System.in.read() thread1.interrupt() thread2.interrupt() serverSocket.close() } spark.sql(s"select * from $streamTableName").show spark.stop() System.out.println("streaming finished") } def showTableCount(spark: SparkSession, tableName: String): Thread = { val thread = new Thread() { override def run(): Unit = { for (_ <- 0 to 1000) { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } } } thread.start() thread } def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { val thread = new Thread() { override def run(): Unit = { var qry: StreamingQuery = null try { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() } catch { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } def writeSocket(serverSocket: ServerSocket): Thread = { val thread = new Thread() { override def run(): Unit = { // wait for client to connection request and accept val clientSocket = serverSocket.accept() val socketWriter = new PrintWriter(clientSocket.getOutputStream()) var index = 0 for (_ <- 1 to 1000) { // write 5 records per iteration for (_ <- 0 to 1000) { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) } socketWriter.flush() Thread.sleep(1000) } socketWriter.close() System.out.println("Socket closed") } } thread.start() thread } } // scalastyle:on println In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException. Here are the logs: 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR Utils: Aborting task java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted. 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |