Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r194651389 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import scala.collection.mutable + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +/** + * This command is used to create Stream Source, which is implemented as a Carbon Table + */ +case class CarbonCreateStreamSourceCommand( + dbName: Option[String], + tableName: String, + fields: Seq[Field], + tblProperties: Map[String, String] +) extends MetadataCommand { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val tableModel = new CarbonSpark2SqlParser().prepareTableModel( + ifNotExistPresent = false, + dbName, + tableName, + fields, + Seq.empty, + mutable.Map[String, String](tblProperties.toSeq: _*), + None + ) + val tableInfo = TableNewProcessor.apply(tableModel) --- End diff -- Create Stream Source is removed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r194665653 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM SOURCE + * CREATE STREAM SOURCE [dbName.]tableName (schema list) + * [TBLPROPERTIES('KEY'='VALUE')] + */ + protected lazy val createStreamSource: Parser[LogicalPlan] = + CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~ + ("(" ~> repsep(anyFieldDef, ",") <~ ")") ~ + (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { + case dbName ~ tableName ~ fields ~ map => + val tblProperties = map.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamSourceCommand(dbName, tableName, fields, tblProperties) + } + + /** + * The syntax of CREATE STREAM + * CREATE STREAM ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (AS ~> restInput) <~ opt(";") ^^ { + case dbName ~ tableName ~ options ~ query => + val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamCommand(dbName, tableName, optionMap, query) + } + + /** + * The syntax of KILL STREAM + * KILL STREAM ON TABLE [dbName].tableName + */ + protected lazy val killStream: Parser[LogicalPlan] = --- End diff -- If the stream is dropped, user need to trigger CREATE STREAM again --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r194666440 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import scala.collection.mutable + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +/** + * This command is used to create Stream Source, which is implemented as a Carbon Table + */ +case class CarbonCreateStreamSourceCommand( + dbName: Option[String], + tableName: String, + fields: Seq[Field], + tblProperties: Map[String, String] +) extends MetadataCommand { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val tableModel = new CarbonSpark2SqlParser().prepareTableModel( --- End diff -- 1. ok --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6304/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5141/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5263/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6313/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5271/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5151/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6316/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5154/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5272/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2328 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6320/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5158/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6321/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5277/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5159/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195666045 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java --- @@ -171,4 +176,21 @@ public Object wrapWithGenericRow(Object[] fields) { } return fields; } + + public static StructType convertToSparkSchema(ColumnSchema[] carbonColumns) { --- End diff -- I think this method do not handle complex datatype schema. Can you check `CarbonRelation` how we are handling complex schema and same way can be used here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195666160 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark + +import scala.collection.mutable + +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +class StreamingOption(val userInputMap: Map[String, String]) { + def trigger: Trigger = { --- End diff -- Ideally, these all could be lazy val instead of def --- |
Free forum by Nabble | Edit this page |