[ https://issues.apache.org/jira/browse/CARBONDATA-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] anubhav tarar reassigned CARBONDATA-2198: ----------------------------------------- Assignee: anubhav tarar > Streaming data to a table with bad_records_action as IGNORE throws ClassCastException > ------------------------------------------------------------------------------------- > > Key: CARBONDATA-2198 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2198 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.4.0 > Reporter: Geetika Gupta > Assignee: anubhav tarar > Priority: Minor > > Steps to reproduce: > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * [http://www.apache.org/licenses/LICENSE-2.0] > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.carbondata.examples > import java.io.\{File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql.\{CarbonEnv, SparkSession} > import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path.\{CarbonStorePath, CarbonTablePath} > // scalastyle:off println > object CarbonStructuredStreamingExample { > def main(args: Array[String]) { > // setup paths > val rootPath = new File(this.getClass.getResource("/").getPath > + "../../../..").getCanonicalPath > val storeLocation = s"$rootPath/examples/spark2/target/store" > val warehouse = s"$rootPath/examples/spark2/target/warehouse" > val metastoredb = s"$rootPath/examples/spark2/target" > val streamTableName = s"stream_table" > CarbonProperties.getInstance() > .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val spark = SparkSession > .builder() > .master("local") > .appName("CarbonStructuredStreamingExample") > .config("spark.sql.warehouse.dir", warehouse) > .getOrCreateCarbonSession(storeLocation, metastoredb) > spark.sparkContext.setLogLevel("ERROR") > val requireCreateTable = true > val useComplexDataType = false > if (requireCreateTable) { > // drop table if exists previously > spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") > // Create target carbon table and populate with initial data > if (useComplexDataType) { > spark.sql( > s""" > |CREATE TABLE ${ streamTableName }(| > |id INT,| > |name STRING,| > |city STRING,| > |salary FLOAT,| > |file struct<school:array<string>, age:int>| > |)| > |STORED BY 'carbondata'| > |TBLPROPERTIES(| > |'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')| > |""".stripMargin) > } else { > spark.sql( > s"""| > |CREATE TABLE ${ streamTableName }(| > |id INT,| > |name STRING,| > |city STRING,| > |salary FLOAT| > |)| > |STORED BY 'carbondata'| > |TBLPROPERTIES(| > |'streaming'='true', 'sort_columns'='name')| > |""".stripMargin) > }| > val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) > val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > // streaming ingest > val serverSocket = new ServerSocket(7071) > val thread1 = startStreaming(spark, tablePath) > val thread2 = writeSocket(serverSocket) > System.out.println("type enter to interrupt streaming") > System.in.read() > thread1.interrupt() > thread2.interrupt() > serverSocket.close() > } > spark.sql(s"select * from $streamTableName").show > spark.stop() > System.out.println("streaming finished") > } > def showTableCount(spark: SparkSession, tableName: String): Thread = { > val thread = new Thread() { > override def run(): Unit = { > for (_ <- 0 to 1000) > { spark.sql(s"select count(*) from $tableName").show(truncate = false) Thread.sleep(1000 * 3) } > } > } > thread.start() > thread > } > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = { > val thread = new Thread() { > override def run(): Unit = { > var qry: StreamingQuery = null > try > { val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 7071) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("bad_records_action", "ignore") .option("dbName", "default") .option("tableName", "stream_table") .start() qry.awaitTermination() } > catch > { case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } > finally > { qry.stop() } > } > } > thread.start() > thread > } > def writeSocket(serverSocket: ServerSocket): Thread = { > val thread = new Thread() { > override def run(): Unit = { > // wait for client to connection request and accept > val clientSocket = serverSocket.accept() > val socketWriter = new PrintWriter(clientSocket.getOutputStream()) > var index = 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 1000) > { index = index + 1 socketWriter.println("null" + ",name_" + index + ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + ":school_" + index + index + "$" + index) } > socketWriter.flush() > Thread.sleep(1000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > } > // scalastyle:on println > In the above example we are streaming data to table with bad_records_action as IGNORE, it throws ClassCastException. > > Here are the logs: > 18/02/23 16:09:50 ERROR StreamSegment: Executor task launch worker-0 Failed to append batch data to stream segment: /home/geetika/Workspace/incubator-carbondata/examples/spark2/target/store/default/stream_table/Fact/Part0/Segment_0 > java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/02/23 16:09:50 ERROR Utils: Aborting task > java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/02/23 16:09:50 ERROR CarbonAppendableStreamSink$: Executor task launch worker-0 Job job_20180223160950_0000 aborted. > 18/02/23 16:09:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > org.apache.carbondata.streaming.CarbonStreamException: Task failed while writing rows > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:241) > at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305) > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317) > ... 8 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |