Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195666269 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark + +import scala.collection.mutable + +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +class StreamingOption(val userInputMap: Map[String, String]) { + def trigger: Trigger = { + val trigger = userInputMap.getOrElse( + "trigger", throw new MalformedCarbonCommandException("trigger must be specified")) + val interval = userInputMap.getOrElse( + "interval", throw new MalformedCarbonCommandException("interval must be specified")) + trigger match { + case "ProcessingTime" => ProcessingTime(interval) --- End diff -- `ProcessingTime is deprecated, suggested using `Trigger.ProcessingTime` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195668205 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.types.{StructField, StructType} + +import org.apache.carbondata.common.exceptions.NoSuchStreamException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +object StreamJobManager { --- End diff -- Add description to the class. And also better mention that this stream job is only available to the driver memory and not persisted so other drivers cannot see ongoing stream jobs. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195669110 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.types.{StructField, StructType} + +import org.apache.carbondata.common.exceptions.NoSuchStreamException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + // map of stream name to job desc + private val jobs = new ConcurrentHashMap[String, StreamJobDesc]() + + private def validateStreamName(streamName: String): Unit = { + if (StreamJobManager.getAllJobs.exists(_.streamName.equalsIgnoreCase(streamName))) { --- End diff -- I guess just checking `jobs.containsKey(streamName)` should be enough. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195673124 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM + * CREATE STREAM streamName ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + (CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~ + (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (AS ~> restInput) <~ opt(";") ^^ { + case streamName ~ dbName ~ tableName ~ options ~ query => + val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamCommand(streamName, dbName, tableName, optionMap, query) + } + + /** + * The syntax of DROP STREAM + * DROP STREAM streamName + */ + protected lazy val dropStream: Parser[LogicalPlan] = + DROP ~> STREAM ~> ident <~ opt(";") ^^ { --- End diff -- Better support `if exist` also --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195673232 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM + * CREATE STREAM streamName ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + (CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~ --- End diff -- Better support `if not exist` also --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r195677693 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.streaming.DataStreamReader +import org.apache.spark.sql.types.{StringType, StructType} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl +import org.apache.carbondata.stream.StreamJobManager + +/** + * This command will start a Spark streaming job to insert rows from source to sink + */ +case class CarbonCreateStreamCommand( + streamName: String, + sinkDbName: Option[String], + sinkTableName: String, + optionMap: Map[String, String], + query: String +) extends DataCommand { + + override def output: Seq[Attribute] = + Seq(AttributeReference("Stream Name", StringType, nullable = false)(), + AttributeReference("JobId", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)()) + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val df = sparkSession.sql(query) + var sourceTable: CarbonTable = null + + // find the streaming source table in the query + // and replace it with StreamingRelation + val streamLp = df.logicalPlan transform { + case r: LogicalRelation + if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => + val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) + if (sourceTable != null && sourceTable.getTableName != source.getTableName) { --- End diff -- I don't get the logic here, You want to verify that more than one should not be present here? Then better update the error to `Stream query on more than one stream source table is not supported` --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6352/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5190/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5303/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196086445 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java --- @@ -171,4 +176,21 @@ public Object wrapWithGenericRow(Object[] fields) { } return fields; } + + public static StructType convertToSparkSchema(ColumnSchema[] carbonColumns) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196087279 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark + +import scala.collection.mutable + +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +class StreamingOption(val userInputMap: Map[String, String]) { + def trigger: Trigger = { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196087809 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark + +import scala.collection.mutable + +import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +class StreamingOption(val userInputMap: Map[String, String]) { + def trigger: Trigger = { + val trigger = userInputMap.getOrElse( + "trigger", throw new MalformedCarbonCommandException("trigger must be specified")) + val interval = userInputMap.getOrElse( + "interval", throw new MalformedCarbonCommandException("interval must be specified")) + trigger match { + case "ProcessingTime" => ProcessingTime(interval) --- End diff -- but it is still there in spark 2.1.0 --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196089005 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.types.{StructField, StructType} + +import org.apache.carbondata.common.exceptions.NoSuchStreamException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +object StreamJobManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + // map of stream name to job desc + private val jobs = new ConcurrentHashMap[String, StreamJobDesc]() + + private def validateStreamName(streamName: String): Unit = { + if (StreamJobManager.getAllJobs.exists(_.streamName.equalsIgnoreCase(streamName))) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196089874 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.stream + +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.streaming.StreamingQuery +import org.apache.spark.sql.types.{StructField, StructType} + +import org.apache.carbondata.common.exceptions.NoSuchStreamException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +object StreamJobManager { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196132467 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM + * CREATE STREAM streamName ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + (CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~ --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196132442 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableFinishStreaming(dbName, table) } + /** + * The syntax of CREATE STREAM + * CREATE STREAM streamName ON TABLE [dbName.]tableName + * [STMPROPERTIES('KEY'='VALUE')] + * AS SELECT COUNT(COL1) FROM tableName + */ + protected lazy val createStream: Parser[LogicalPlan] = + (CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~ + (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ + (AS ~> restInput) <~ opt(";") ^^ { + case streamName ~ dbName ~ tableName ~ options ~ query => + val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String] + CarbonCreateStreamCommand(streamName, dbName, tableName, optionMap, query) + } + + /** + * The syntax of DROP STREAM + * DROP STREAM streamName + */ + protected lazy val dropStream: Parser[LogicalPlan] = + DROP ~> STREAM ~> ident <~ opt(";") ^^ { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2328#discussion_r196137795 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.stream + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.streaming.DataStreamReader +import org.apache.spark.sql.types.{StringType, StructType} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.StreamingOption +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl +import org.apache.carbondata.stream.StreamJobManager + +/** + * This command will start a Spark streaming job to insert rows from source to sink + */ +case class CarbonCreateStreamCommand( + streamName: String, + sinkDbName: Option[String], + sinkTableName: String, + optionMap: Map[String, String], + query: String +) extends DataCommand { + + override def output: Seq[Attribute] = + Seq(AttributeReference("Stream Name", StringType, nullable = false)(), + AttributeReference("JobId", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)()) + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val df = sparkSession.sql(query) + var sourceTable: CarbonTable = null + + // find the streaming source table in the query + // and replace it with StreamingRelation + val streamLp = df.logicalPlan transform { + case r: LogicalRelation + if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => + val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) + if (sourceTable != null && sourceTable.getTableName != source.getTableName) { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2328 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/5310/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/6361/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2328 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/5199/ --- |
Free forum by Nabble | Edit this page |