Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1840 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1840 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3021/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1840 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1790/ --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162838020 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { --- End diff -- I suggest using example name likes : 1. CarbonSparkStreamingExample 2. Changes "StreamExample" to "CarbonStructuredStreamingExample" --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162839907 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { --- End diff -- OK, but i think it's better to use 'CarbonBatchSparkStreamingExample' instead of 'CarbonSparkStreamingExample', i will add another example called 'CarbonStreamSparkStreamingExample' after Carbon stream table integrates with Spark Streaming. --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162840147 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" --- End diff -- suggest using this code : val spark = ExampleUtils.createCarbonSession("xxexamplenamexx") --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162840232 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_batch_table" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local[4]") + .appName("DStreamWithBatchTableExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("WARN") + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + // set AUTO_LOAD_MERGE to true to compact segment automatically + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city', --- End diff -- Please double check if need to add the below properties for this example. ---------------------------------------- + | 'sort_columns'='name', + | 'dictionary_include'='city', + | 'MAJOR_COMPACTION_SIZE'='64', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='4,10') --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162841010 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_batch_table" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local[4]") + .appName("DStreamWithBatchTableExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("WARN") + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + // set AUTO_LOAD_MERGE to true to compact segment automatically + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city', + | 'MAJOR_COMPACTION_SIZE'='64', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='4,10') + | """.stripMargin) + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, streamTableName) + val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath) + // wait for stop signal to stop Spark Streaming App + waitForStopSignal(ssc) + // it need to start Spark Streaming App in main thread + // otherwise it will encounter an not-serializable exception. + ssc.start() + ssc.awaitTermination() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) + + spark.stop() --- End diff -- add drop table. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162841758 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" --- End diff -- OK, but there is a bug in ExampleUtils.createCarbonSession, the parameter 'appName' is not set to Spark app name, i fix this first. --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162851669 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_batch_table" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local[4]") + .appName("DStreamWithBatchTableExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("WARN") + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + // set AUTO_LOAD_MERGE to true to compact segment automatically + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city', + | 'MAJOR_COMPACTION_SIZE'='64', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='4,10') + | """.stripMargin) + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, streamTableName) + val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath) + // wait for stop signal to stop Spark Streaming App + waitForStopSignal(ssc) + // it need to start Spark Streaming App in main thread + // otherwise it will encounter an not-serializable exception. + ssc.start() + ssc.awaitTermination() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) + + spark.stop() --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162851674 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162851688 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { --- End diff -- Done --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162852531 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object DStreamWithBatchTableExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target" + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_batch_table" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + val spark = SparkSession + .builder() + .master("local[4]") + .appName("DStreamWithBatchTableExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation, metastoredb) + + spark.sparkContext.setLogLevel("WARN") + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + // set AUTO_LOAD_MERGE to true to compact segment automatically + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city', --- End diff -- remove 'MAJOR_COMPACTION_SIZE' and it's ok to add other four properties for this example. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1840 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1792/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1840 retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1840 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3023/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1840 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1793/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1840 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3024/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1840 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3024/ --- |
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:
https://github.com/apache/carbondata/pull/1840 @QiangCai @jackylk @chenliang613 please review again, thanks. --- |
Free forum by Nabble | Edit this page |