[GitHub] carbondata pull request #2328: Support StreamSQL for streaming job

classic Classic list List threaded Threaded
88 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: Support StreamSQL for streaming job

qiuchenjian-2
GitHub user jackylk opened a pull request:

    https://github.com/apache/carbondata/pull/2328

    Support StreamSQL for streaming job

    Currently, user need to write Spark Streaming APP to use carbon streaming ingest feature, which is not so easy for some users. By providing StreamSQL, user can manage the streaming job more easily.
   
    1. CREATE STREAM SOURCE
   
       ```SQL
       CREATE STREAM SOURCE iot.sensor_event(
         time LONG,
         device_id INT,
         temperature double,
         humidity double)
       TBLPROPERTIES(
         'streaming'='source'
         'format'='kafka',
         'kafka.bootstrap.servers'='host1:port1,host2:port2',
         'topic'='update)
       ```
   
    2. CREATE SINK TABLE
   
       ```SQL
       CREATE TABLE iot.room_temperature(
         time LONG,
         room_id INT,
         temperature double)
       STORED AS carbondata
       TBLPROPERTIES('streaming'='sink') #compatible with 'true'
       ```
   
    3. CREATE STREAM (Start a streaming job)
   
       ```SQL
       CREATE STREAM ON TABLE iot.room_temperature
       STMPROPERTIES(
         'trigger'='ProcessingTime/OneTimeTrigger',
         'interval'='5 seconds')
         AS
       SELECT time, lookup_room_id(device_id) as room_id, temperature
       FROM iot.sensor_event
       WHERE temperature between 0 and 40
       ```
   
   
   
    ## Streaming Job Management
   
    1. SHOW STREAMS
   
       ```SQL
       SHOW STREAMS [ON TABLE dbName.tableName]
       ```
   
       It will print
   
       | Jobid              | status  | Source | Sink | start time          | time elapsed |
       | ------------------ | ------- | ------ | ---- | ------------------- | ------------ |
       | jkfldsjkad-fkwelj0 | Started | device | fact | 2018-02-03 14:32:42 | 10d2h32m     |
   
    2. Kill STREAM
   
       ```SQL
       KILL STREAM ON TABLE dbName.tableName
       ```
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jackylk/incubator-carbondata stream

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2328
   
----
commit 2482c0367deb17b1d46b7676a1339d42e8329456
Author: Jacky Li <jacky.likun@...>
Date:   2018-05-21T13:49:33Z

    support StreamSQL

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189606530
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.stream
    +
    +import java.util.concurrent.{CountDownLatch, TimeUnit}
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +import org.apache.spark.sql.streaming.StreamingQuery
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.spark.StreamingOption
    +import org.apache.carbondata.streaming.CarbonStreamException
    +
    +object StreamJobManager {
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val jobs = mutable.Map[String, StreamJobDesc]()
    --- End diff --
   
    Need to use concurrent hash map


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189615740
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import java.util.Date
    +import java.util.concurrent.TimeUnit
    +
    +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
    +import org.apache.spark.sql.execution.command.MetadataCommand
    +import org.apache.spark.sql.types.StringType
    +
    +import org.apache.carbondata.stream.StreamJobManager
    +
    +/**
    + * Show all streams created or on a specified table
    + */
    +case class CarbonShowStreamsCommand(
    +    tableOp: Option[TableIdentifier]
    +) extends MetadataCommand {
    +  override def output: Seq[Attribute] = {
    +    Seq(AttributeReference("JobId", StringType, nullable = false)(),
    +      AttributeReference("Status", StringType, nullable = false)(),
    +      AttributeReference("Source", StringType, nullable = false)(),
    +      AttributeReference("Sink", StringType, nullable = false)(),
    +      AttributeReference("Start Time", StringType, nullable = false)(),
    +      AttributeReference("Time Elapse", StringType, nullable = false)())
    +  }
    +
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val jobs = tableOp match {
    +      case None => StreamJobManager.getAllJobs.toSeq
    +      case Some(table) =>
    +        val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
    +        StreamJobManager.getAllJobs.filter { job =>
    +          job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
    +          job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
    +        }.toSeq
    +    }
    +
    +    jobs.map { job =>
    +      val elapsedTime = System.currentTimeMillis() - job.startTime
    +      Row(
    +        job.streamingQuery.id.toString,
    +        if (job.streamingQuery.isActive) "RUNNING" else "FAILED",
    +        s"${ job.sourceDb }.${ job.sourceTable }",
    +        s"${ job.sinkDb }.${ job.sinkTable }",
    +        new Date(job.startTime).toString,
    +        String.format(
    +          "%s days, %s hours, %s min, %s sec",
    +          TimeUnit.MILLISECONDS.toDays(elapsedTime).toString,
    +          TimeUnit.MILLISECONDS.toHours(elapsedTime).toString,
    --- End diff --
   
    toHours will give total hours elapsed
    toMinutres will give total minutes elapsed.
    It will not in format (10days, 1hours, 5min, 34 sec)


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Since you provide `CREATE STREAM SOURCE XXX` grammar, do we need another `SHOW STREAM SOURCE ...`?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    In the description of PR, better to optimize the output of `SHOW STREAMS` and `Kill STREAM` and use the tableNames you provided in the above `Start a streaming job`.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6019/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/4861/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5031/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5033/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5037/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5038/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189764133
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor}
    +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +
    +/**
    + * This command is used to create Stream Source, which is implemented as a Carbon Table
    + */
    +case class CarbonCreateStreamSourceCommand(
    +    dbName: Option[String],
    +    tableName: String,
    +    fields: Seq[Field],
    +    tblProperties: Map[String, String]
    +) extends MetadataCommand {
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
    +      ifNotExistPresent = false,
    +      dbName,
    +      tableName,
    +      fields,
    +      Seq.empty,
    +      mutable.Map[String, String](tblProperties.toSeq: _*),
    +      None
    +    )
    +    val tableInfo = TableNewProcessor.apply(tableModel)
    --- End diff --
   
    The `Create Stream Source` internally calls `Create Table`. What's the difference between them? If there is no difference, is `Create Stream Source` necessary? Can ordinary carbon table can be used as source?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189762936
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---
    @@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
     import scala.collection.JavaConverters._
     import scala.util.parsing.combinator.RegexParsers
     
    +import org.apache.spark.sql.CarbonEnv
    --- End diff --
   
    ?
    Only add an import without changing the code?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189764478
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
    @@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             CarbonAlterTableFinishStreaming(dbName, table)
         }
     
    +  /**
    +   * The syntax of CREATE STREAM SOURCE
    +   * CREATE STREAM SOURCE [dbName.]tableName (schema list)
    +   * [TBLPROPERTIES('KEY'='VALUE')]
    +   */
    +  protected lazy val createStreamSource: Parser[LogicalPlan] =
    +    CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~
    +    ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~
    +    (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
    +      case dbName ~ tableName ~ fields ~ map =>
    +        val tblProperties = map.getOrElse(List[(String, String)]()).toMap[String, String]
    +        CarbonCreateStreamSourceCommand(dbName, tableName, fields, tblProperties)
    +    }
    +
    +  /**
    +   * The syntax of CREATE STREAM
    +   * CREATE STREAM ON TABLE [dbName.]tableName
    +   * [STMPROPERTIES('KEY'='VALUE')]
    +   * AS SELECT COUNT(COL1) FROM tableName
    +   */
    +  protected lazy val createStream: Parser[LogicalPlan] =
    +    CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~
    +    (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
    +    (AS ~> restInput) <~ opt(";") ^^ {
    +      case dbName ~ tableName ~ options ~ query =>
    +        val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String]
    +        CarbonCreateStreamCommand(dbName, tableName, optionMap, query)
    +    }
    +
    +  /**
    +   * The syntax of KILL STREAM
    +   * KILL STREAM ON TABLE [dbName].tableName
    +   */
    +  protected lazy val killStream: Parser[LogicalPlan] =
    --- End diff --
   
    What will happen if I call `Kill Stream`?
    Should I (re)`create stream` if I want the stream start again? If so, better to change the grammar to `DROP STREAM ON TABLE ...`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189763349
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.stream
    +
    +import java.util.concurrent.{CountDownLatch, TimeUnit}
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +import org.apache.spark.sql.streaming.StreamingQuery
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.spark.StreamingOption
    +import org.apache.carbondata.streaming.CarbonStreamException
    +
    +object StreamJobManager {
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val jobs = mutable.Map[String, StreamJobDesc]()
    +
    +  /**
    +   * Start a spark streaming query
    +   * @param sparkSession session instance
    +   * @param sourceTable stream source table
    +   * @param sinkTable sink table to insert to
    +   * @param query query string
    +   * @param streamDf dataframe that containing the query from stream source table
    +   * @param options options provided by user
    +   * @return Job ID
    +   */
    +  def startJob(
    +      sparkSession: SparkSession,
    +      sourceTable: CarbonTable,
    +      sinkTable: CarbonTable,
    +      query: String,
    +      streamDf: DataFrame,
    +      options: StreamingOption): String = {
    +    val latch = new CountDownLatch(1)
    +    var exception: Throwable = null
    +    var job: StreamingQuery = null
    +
    +    // start a new thread to run the streaming ingest job, the job will be running
    +    // until user stops it by STOP STREAM JOB
    --- End diff --
   
    outdated `STOP STREAM JOB`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189778004
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor}
    +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +
    +/**
    + * This command is used to create Stream Source, which is implemented as a Carbon Table
    + */
    +case class CarbonCreateStreamSourceCommand(
    +    dbName: Option[String],
    +    tableName: String,
    +    fields: Seq[Field],
    +    tblProperties: Map[String, String]
    +) extends MetadataCommand {
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
    --- End diff --
   
    1) 'streaming'='source' need not be explicitly passed by user, for this ddl can internally always add.
    2) select * on stream src Table should fail.
    3) another ddl required to show stream source tables. As it is tough to identify streaming source tables from list of tables.
    4) Stream sink table DDL mentioned in description not required as carbondata already supports streaming ingestion.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189806161
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.stream
    +
    +import java.util.concurrent.{CountDownLatch, TimeUnit}
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +import org.apache.spark.sql.streaming.StreamingQuery
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.spark.StreamingOption
    +import org.apache.carbondata.streaming.CarbonStreamException
    +
    +object StreamJobManager {
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val jobs = mutable.Map[String, StreamJobDesc]()
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r189806362
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---
    @@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
     import scala.collection.JavaConverters._
     import scala.util.parsing.combinator.RegexParsers
     
    +import org.apache.spark.sql.CarbonEnv
    --- End diff --
   
    I will remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r194650822
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import java.util.Date
    +import java.util.concurrent.TimeUnit
    +
    +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
    +import org.apache.spark.sql.execution.command.MetadataCommand
    +import org.apache.spark.sql.types.StringType
    +
    +import org.apache.carbondata.stream.StreamJobManager
    +
    +/**
    + * Show all streams created or on a specified table
    + */
    +case class CarbonShowStreamsCommand(
    +    tableOp: Option[TableIdentifier]
    +) extends MetadataCommand {
    +  override def output: Seq[Attribute] = {
    +    Seq(AttributeReference("JobId", StringType, nullable = false)(),
    +      AttributeReference("Status", StringType, nullable = false)(),
    +      AttributeReference("Source", StringType, nullable = false)(),
    +      AttributeReference("Sink", StringType, nullable = false)(),
    +      AttributeReference("Start Time", StringType, nullable = false)(),
    +      AttributeReference("Time Elapse", StringType, nullable = false)())
    +  }
    +
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val jobs = tableOp match {
    +      case None => StreamJobManager.getAllJobs.toSeq
    +      case Some(table) =>
    +        val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
    +        StreamJobManager.getAllJobs.filter { job =>
    +          job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
    +          job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
    +        }.toSeq
    +    }
    +
    +    jobs.map { job =>
    +      val elapsedTime = System.currentTimeMillis() - job.startTime
    +      Row(
    +        job.streamingQuery.id.toString,
    +        if (job.streamingQuery.isActive) "RUNNING" else "FAILED",
    +        s"${ job.sourceDb }.${ job.sourceTable }",
    +        s"${ job.sinkDb }.${ job.sinkTable }",
    +        new Date(job.startTime).toString,
    +        String.format(
    +          "%s days, %s hours, %s min, %s sec",
    +          TimeUnit.MILLISECONDS.toDays(elapsedTime).toString,
    +          TimeUnit.MILLISECONDS.toHours(elapsedTime).toString,
    --- End diff --
   
    ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r194651326
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.stream
    +
    +import java.util.concurrent.{CountDownLatch, TimeUnit}
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +import org.apache.spark.sql.streaming.StreamingQuery
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.spark.StreamingOption
    +import org.apache.carbondata.streaming.CarbonStreamException
    +
    +object StreamJobManager {
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val jobs = mutable.Map[String, StreamJobDesc]()
    +
    +  /**
    +   * Start a spark streaming query
    +   * @param sparkSession session instance
    +   * @param sourceTable stream source table
    +   * @param sinkTable sink table to insert to
    +   * @param query query string
    +   * @param streamDf dataframe that containing the query from stream source table
    +   * @param options options provided by user
    +   * @return Job ID
    +   */
    +  def startJob(
    +      sparkSession: SparkSession,
    +      sourceTable: CarbonTable,
    +      sinkTable: CarbonTable,
    +      query: String,
    +      streamDf: DataFrame,
    +      options: StreamingOption): String = {
    +    val latch = new CountDownLatch(1)
    +    var exception: Throwable = null
    +    var job: StreamingQuery = null
    +
    +    // start a new thread to run the streaming ingest job, the job will be running
    +    // until user stops it by STOP STREAM JOB
    --- End diff --
   
    fixed


---
12345