GitHub user jackylk opened a pull request:
https://github.com/apache/carbondata/pull/2328 Support StreamSQL for streaming job Currently, user need to write Spark Streaming APP to use carbon streaming ingest feature, which is not so easy for some users. By providing StreamSQL, user can manage the streaming job more easily. 1. CREATE STREAM SOURCE ```SQL CREATE STREAM SOURCE iot.sensor_event( time LONG, device_id INT, temperature double, humidity double) TBLPROPERTIES( 'streaming'='source' 'format'='kafka', 'kafka.bootstrap.servers'='host1:port1,host2:port2', 'topic'='update) ``` 2. CREATE SINK TABLE ```SQL CREATE TABLE iot.room_temperature( time LONG, room_id INT, temperature double) STORED AS carbondata TBLPROPERTIES('streaming'='sink') #compatible with 'true' ``` 3. CREATE STREAM (Start a streaming job) ```SQL CREATE STREAM ON TABLE iot.room_temperature STMPROPERTIES( 'trigger'='ProcessingTime/OneTimeTrigger', 'interval'='5 seconds') AS SELECT time, lookup_room_id(device_id) as room_id, temperature FROM iot.sensor_event WHERE temperature between 0 and 40 ``` ## Streaming Job Management 1. SHOW STREAMS ```SQL SHOW STREAMS [ON TABLE dbName.tableName] ``` It will print | Jobid | status | Source | Sink | start time | time elapsed | | ------------------ | ------- | ------ | ---- | ------------------- | ------------ | | jkfldsjkad-fkwelj0 | Started | device | fact | 2018-02-03 14:32:42 | 10d2h32m | 2. Kill STREAM ```SQL KILL STREAM ON TABLE dbName.tableName ``` - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jackylk/incubator-carbondata stream Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2328.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2328 ---- commit 2482c0367deb17b1d46b7676a1339d42e8329456 Author: Jacky Li <jacky.likun@...> Date: 2018-05-21T13:49:33Z support StreamSQL ---- --- |
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189606530 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.streaming.CarbonStreamException + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val jobs = mutable.Map[String, StreamJobDesc]() --- End diff -- Need to use concurrent hash map --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189615740 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.types.StringType + +import org.apache.carbondata.stream.StreamJobManager + +/** + * Show all streams created or on a specified table + */ +case class CarbonShowStreamsCommand( + tableOp: Option[TableIdentifier] +) extends MetadataCommand { + override def output: Seq[Attribute] = { + Seq(AttributeReference("JobId", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)(), + AttributeReference("Source", StringType, nullable = false)(), + AttributeReference("Sink", StringType, nullable = false)(), + AttributeReference("Start Time", StringType, nullable = false)(), + AttributeReference("Time Elapse", StringType, nullable = false)()) + } + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val jobs = tableOp match { + case None => StreamJobManager.getAllJobs.toSeq + case Some(table) => + val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession) + StreamJobManager.getAllJobs.filter { job => + job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) && + job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName) + }.toSeq + } + + jobs.map { job => + val elapsedTime = System.currentTimeMillis() - job.startTime + Row( + job.streamingQuery.id.toString, + if (job.streamingQuery.isActive) "RUNNING" else "FAILED", + s"${ job.sourceDb }.${ job.sourceTable }", + s"${ job.sinkDb }.${ job.sinkTable }", + new Date(job.startTime).toString, + String.format( + "%s days, %s hours, %s min, %s sec", + TimeUnit.MILLISECONDS.toDays(elapsedTime).toString, + TimeUnit.MILLISECONDS.toHours(elapsedTime).toString, --- End diff -- toHours will give total hours elapsed toMinutres will give total minutes elapsed. It will not in format (10days, 1hours, 5min, 34 sec) --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2328 Since you provide `CREATE STREAM SOURCE XXX` grammar, do we need another `SHOW STREAM SOURCE ...`? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/2328 In the description of PR, better to optimize the output of `SHOW STREAMS` and `Kill STREAM` and use the tableNames you provided in the above `Start a streaming job`. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6019/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4861/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5031/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5033/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5037/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5038/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189764133 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import scala.collection.mutable + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +/** + * This command is used to create Stream Source, which is implemented as a Carbon Table + */ +case class CarbonCreateStreamSourceCommand( + dbName: Option[String], + tableName: String, + fields: Seq[Field], + tblProperties: Map[String, String] +) extends MetadataCommand { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val tableModel = new CarbonSpark2SqlParser().prepareTableModel( + ifNotExistPresent = false, + dbName, + tableName, + fields, + Seq.empty, + mutable.Map[String, String](tblProperties.toSeq: _*), + None + ) + val tableInfo = TableNewProcessor.apply(tableModel) --- End diff -- The `Create Stream Source` internally calls `Create Table`. What's the difference between them? If there is no difference, is `Create Stream Source` necessary? Can ordinary carbon table can be used as source? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189762936 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala --- @@ -22,6 +22,7 @@ import scala.Array.canBuildFrom import scala.collection.JavaConverters._ import scala.util.parsing.combinator.RegexParsers +import org.apache.spark.sql.CarbonEnv --- End diff -- ? Only add an import without changing the code? --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189764478 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM SOURCE + * CREATE STREAM SOURCE [dbName.]tableName (schema list) + * [TBLPROPERTIES('KEY'='VALUE')] + */ + protected lazy val createStreamSource: Parser[LogicalPlan] = + CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~ + ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~ + (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { + case dbName ~ tableName ~ fields ~ map => + val tblProperties = map.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamSourceCommand(dbName, tableName, fields, tblProperties) + } + + /** + * The syntax of CREATE STREAM + * CREATE STREAM ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (AS ~> restInput) <~ opt(";") ^^ { + case dbName ~ tableName ~ options ~ query => + val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamCommand(dbName, tableName, optionMap, query) + } + + /** + * The syntax of KILL STREAM + * KILL STREAM ON TABLE [dbName].tableName + */ + protected lazy val killStream: Parser[LogicalPlan] = --- End diff -- What will happen if I call `Kill Stream`? Should I (re)`create stream` if I want the stream start again? If so, better to change the grammar to `DROP STREAM ON TABLE ...` --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189763349 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.streaming.CarbonStreamException + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val jobs = mutable.Map[String, StreamJobDesc]() + + /** + * Start a spark streaming query + * @param sparkSession session instance + * @param sourceTable stream source table + * @param sinkTable sink table to insert to + * @param query query string + * @param streamDf dataframe that containing the query from stream source table + * @param options options provided by user + * @return Job ID + */ + def startJob( + sparkSession: SparkSession, + sourceTable: CarbonTable, + sinkTable: CarbonTable, + query: String, + streamDf: DataFrame, + options: StreamingOption): String = { + val latch = new CountDownLatch(1) + var exception: Throwable = null + var job: StreamingQuery = null + + // start a new thread to run the streaming ingest job, the job will be running + // until user stops it by STOP STREAM JOB --- End diff -- outdated `STOP STREAM JOB` --- |
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189778004 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import scala.collection.mutable + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +/** + * This command is used to create Stream Source, which is implemented as a Carbon Table + */ +case class CarbonCreateStreamSourceCommand( + dbName: Option[String], + tableName: String, + fields: Seq[Field], + tblProperties: Map[String, String] +) extends MetadataCommand { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val tableModel = new CarbonSpark2SqlParser().prepareTableModel( --- End diff -- 1) 'streaming'='source' need not be explicitly passed by user, for this ddl can internally always add. 2) select * on stream src Table should fail. 3) another ddl required to show stream source tables. As it is tough to identify streaming source tables from list of tables. 4) Stream sink table DDL mentioned in description not required as carbondata already supports streaming ingestion. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189806161 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.streaming.CarbonStreamException + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val jobs = mutable.Map[String, StreamJobDesc]() --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r189806362 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala --- @@ -22,6 +22,7 @@ import scala.Array.canBuildFrom import scala.collection.JavaConverters._ import scala.util.parsing.combinator.RegexParsers +import org.apache.spark.sql.CarbonEnv --- End diff -- I will remove it --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r194650822 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.types.StringType + +import org.apache.carbondata.stream.StreamJobManager + +/** + * Show all streams created or on a specified table + */ +case class CarbonShowStreamsCommand( + tableOp: Option[TableIdentifier] +) extends MetadataCommand { + override def output: Seq[Attribute] = { + Seq(AttributeReference("JobId", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)(), + AttributeReference("Source", StringType, nullable = false)(), + AttributeReference("Sink", StringType, nullable = false)(), + AttributeReference("Start Time", StringType, nullable = false)(), + AttributeReference("Time Elapse", StringType, nullable = false)()) + } + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val jobs = tableOp match { + case None => StreamJobManager.getAllJobs.toSeq + case Some(table) => + val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession) + StreamJobManager.getAllJobs.filter { job => + job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) && + job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName) + }.toSeq + } + + jobs.map { job => + val elapsedTime = System.currentTimeMillis() - job.startTime + Row( + job.streamingQuery.id.toString, + if (job.streamingQuery.isActive) "RUNNING" else "FAILED", + s"${ job.sourceDb }.${ job.sourceTable }", + s"${ job.sinkDb }.${ job.sinkTable }", + new Date(job.startTime).toString, + String.format( + "%s days, %s hours, %s min, %s sec", + TimeUnit.MILLISECONDS.toDays(elapsedTime).toString, + TimeUnit.MILLISECONDS.toHours(elapsedTime).toString, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r194651326 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.streaming.CarbonStreamException + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val jobs = mutable.Map[String, StreamJobDesc]() + + /** + * Start a spark streaming query + * @param sparkSession session instance + * @param sourceTable stream source table + * @param sinkTable sink table to insert to + * @param query query string + * @param streamDf dataframe that containing the query from stream source table + * @param options options provided by user + * @return Job ID + */ + def startJob( + sparkSession: SparkSession, + sourceTable: CarbonTable, + sinkTable: CarbonTable, + query: String, + streamDf: DataFrame, + options: StreamingOption): String = { + val latch = new CountDownLatch(1) + var exception: Throwable = null + var job: StreamingQuery = null + + // start a new thread to run the streaming ingest job, the job will be running + // until user stops it by STOP STREAM JOB --- End diff -- fixed --- |
Free forum by Nabble | Edit this page |