[GitHub] carbondata pull request #2328: Support StreamSQL for streaming job

classic Classic list List threaded Threaded
88 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r194651389
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor}
    +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +
    +/**
    + * This command is used to create Stream Source, which is implemented as a Carbon Table
    + */
    +case class CarbonCreateStreamSourceCommand(
    +    dbName: Option[String],
    +    tableName: String,
    +    fields: Seq[Field],
    +    tblProperties: Map[String, String]
    +) extends MetadataCommand {
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
    +      ifNotExistPresent = false,
    +      dbName,
    +      tableName,
    +      fields,
    +      Seq.empty,
    +      mutable.Map[String, String](tblProperties.toSeq: _*),
    +      None
    +    )
    +    val tableInfo = TableNewProcessor.apply(tableModel)
    --- End diff --
   
    Create Stream Source is removed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r194665653
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
    @@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             CarbonAlterTableFinishStreaming(dbName, table)
         }
     
    +  /**
    +   * The syntax of CREATE STREAM SOURCE
    +   * CREATE STREAM SOURCE [dbName.]tableName (schema list)
    +   * [TBLPROPERTIES('KEY'='VALUE')]
    +   */
    +  protected lazy val createStreamSource: Parser[LogicalPlan] =
    +    CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~
    +    ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~
    +    (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
    +      case dbName ~ tableName ~ fields ~ map =>
    +        val tblProperties = map.getOrElse(List[(String, String)]()).toMap[String, String]
    +        CarbonCreateStreamSourceCommand(dbName, tableName, fields, tblProperties)
    +    }
    +
    +  /**
    +   * The syntax of CREATE STREAM
    +   * CREATE STREAM ON TABLE [dbName.]tableName
    +   * [STMPROPERTIES('KEY'='VALUE')]
    +   * AS SELECT COUNT(COL1) FROM tableName
    +   */
    +  protected lazy val createStream: Parser[LogicalPlan] =
    +    CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~
    +    (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
    +    (AS ~> restInput) <~ opt(";") ^^ {
    +      case dbName ~ tableName ~ options ~ query =>
    +        val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String]
    +        CarbonCreateStreamCommand(dbName, tableName, optionMap, query)
    +    }
    +
    +  /**
    +   * The syntax of KILL STREAM
    +   * KILL STREAM ON TABLE [dbName].tableName
    +   */
    +  protected lazy val killStream: Parser[LogicalPlan] =
    --- End diff --
   
    If the stream is dropped, user need to trigger CREATE STREAM again


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r194666440
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.command.stream
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor}
    +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
    +
    +/**
    + * This command is used to create Stream Source, which is implemented as a Carbon Table
    + */
    +case class CarbonCreateStreamSourceCommand(
    +    dbName: Option[String],
    +    tableName: String,
    +    fields: Seq[Field],
    +    tblProperties: Map[String, String]
    +) extends MetadataCommand {
    +  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    +    val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
    --- End diff --
   
    1. ok


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6304/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5141/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5263/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6313/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5271/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5151/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6316/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5154/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5272/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6320/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5158/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6321/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5277/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2328: [CARBONDATA-2504][STREAM] Support StreamSQL for stre...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2328
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5159/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r195666045
 
    --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java ---
    @@ -171,4 +176,21 @@ public Object wrapWithGenericRow(Object[] fields) {
         }
         return fields;
       }
    +
    +  public static StructType convertToSparkSchema(ColumnSchema[] carbonColumns) {
    --- End diff --
   
    I think this method do not handle complex datatype schema. Can you check `CarbonRelation` how we are handling complex schema and same way can be used here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2328#discussion_r195666160
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
    +
    +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.streaming.parser.CarbonStreamParser
    +
    +class StreamingOption(val userInputMap: Map[String, String]) {
    +  def trigger: Trigger = {
    --- End diff --
   
    Ideally, these all could be lazy val instead of def


---
12345