GitHub user ajithme opened a pull request:
https://github.com/apache/carbondata/pull/2495 Added for kafka integration with Carbon StreamSQL 1. Pass source table properties to streamReader.load() 2. Do not pass schema when sparkSession.readStream 3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema ) 4. Extract the dataframe from kafka source which contain actual data schema @ writeStream Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? NO - [ ] Any backward compatibility impacted? NO - [ ] Document update required? Yes: Need to use CSV parser - [ ] Testing done Done - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajithme/carbondata kafkaStreamSQLIntegration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2495 ---- commit 0560c5e69c61d6594a91994da918493335bd0cb4 Author: Ajith <ajith2489@...> Date: 2018-07-12T03:47:22Z Added for kafka integration with Carbon StreamSQL 1. Pass source table properties to streamReader.load() 2. Do not pass schema when sparkSession.readStream 3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema ) 4. Extract the dataframe from kafka source which contain actual data schema @ writeStream ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2495 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2495 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2495 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2495#discussion_r202222238 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -58,14 +58,16 @@ object StreamJobManager { "streaming sink table " + "('streaming' tblproperty is not 'sink' or 'true')") } + // TODO: validate query schema against sink ( as in kafka we cannot get schema directly) + /* --- End diff -- can we move this validation before, so that we still do validation for non-kafka format? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2495#discussion_r202222321 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -102,14 +104,23 @@ object StreamJobManager { } validateSourceTable(sourceTable) - validateSinkTable(streamDf.schema, sinkTable) + + // kafka surce always have fixed schema, need to get actual schema + val isKafka = Option(sourceTable.getTableInfo.getFactTable.getTableProperties --- End diff -- Please add a function in CarbonTable to get the underlying format --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2495#discussion_r202222412 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null + var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation --- End diff -- please modify this comment to describe the updated code --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2495#discussion_r202222509 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null + var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation - val streamLp = df.logicalPlan transform { + df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => - val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) + val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) --- End diff -- please add comment here to describe what is done inside prepareDataFrame --- |
In reply to this post by qiuchenjian-2
Github user ajithme commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2495#discussion_r202232048 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null + var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation - val streamLp = df.logicalPlan transform { + df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => - val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) + val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) --- End diff -- Added method comments --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/2495 LGTM. Merging into carbonstore branch Thanks for working on this. --- |
In reply to this post by qiuchenjian-2
Github user ajithme commented on the issue:
https://github.com/apache/carbondata/pull/2495 Merged https://github.com/apache/carbondata/commit/9ac55a5a656ebe106697ca76a04916bea2ef3109 --- |
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |