GitHub user jackylk opened a pull request:
https://github.com/apache/carbondata/pull/2695 [CARBONDATA-2919] Support ingest from Kafka in StreamSQL WIP - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jackylk/incubator-carbondata kafka Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2695.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2695 ---- commit 19dd6c641b9faa03da8b7a713873f398cd3b97d4 Author: Ajith <ajith2489@...> Date: 2018-07-12T03:47:22Z [CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL Modification in this PR: 1.Pass source table properties to streamReader.load() 2.Do not pass schema when sparkSession.readStream 3.Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema ) 4.Extract the dataframe from kafka source which contain actual data schema @ writeStream This closes #2495 commit c2148e7e4fb6d7d5917a674075a7afdf941538c2 Author: Jacky Li <jacky.likun@...> Date: 2018-09-05T17:45:40Z wip ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/100/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/268/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8338/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215484019 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.examples.util.ExampleUtils +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +// scalastyle:off println +object StreamSQLExample { + def main(args: Array[String]) { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4) + val streamTableName = s"stream_table" + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS $streamTableName") + spark.sql("DROP TABLE IF EXISTS source") + + // Create target carbon table and populate with initial data + spark.sql( + s""" + | CREATE TABLE $streamTableName( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED AS carbondata + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='name') + """.stripMargin) + + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + } + + spark.sql( + """ + | CREATE TABLE source ( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED AS carbondata + | TBLPROPERTIES( + | 'streaming'='source', + | 'format'='kafka', + | 'kafka.bootstrap.servers'='localhost:9092', + | 'subscribe'='test') + """.stripMargin) + + spark.sql( + s""" + | CREATE STREAM ingest ON TABLE $streamTableName + | STMPROPERTIES( + | 'trigger' = 'ProcessingTime', + | 'interval' = '3 seconds', + | 'carbon.stream.parser'='org.apache.carbondata.streaming.parser.CSVStreamParserImp', + | 'BAD_RECORDS_ACTION'='force') + | AS SELECT * FROM source + """.stripMargin) + + (1 to 1000).foreach { i => + spark.sql(s"select * from $streamTableName") + .show(100, truncate = false) + Thread.sleep(5000) + } + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { --- End diff -- please remove unused code --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215483849 --- Diff: examples/spark2/pom.xml --- @@ -53,6 +53,11 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> + <version>2.2.1</version> --- End diff -- please use spark.version property --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215507635 --- Diff: examples/spark2/pom.xml --- @@ -53,6 +53,11 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> + <version>2.2.1</version> --- End diff -- ok, fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/111/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/279/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8349/ --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215972515 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -52,19 +52,24 @@ object StreamJobManager { } } - private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = { + private def validateSinkTable(validateQuerySchema: Boolean, + querySchema: StructType, sink: CarbonTable): Unit = { if (!sink.isStreamingSink) { throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " + "streaming sink table " + "('streaming' tblproperty is not 'sink' or 'true')") } - val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => - StructField(column.getColName, - CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType)) - } - if (!querySchema.equals(StructType(fields))) { - throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " + - s"does not match query output") + // TODO: validate query schema against sink in kafka (we cannot get schema directly) + if (validateQuerySchema) { + val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => + StructField( + column.getColName, + CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType)) + } + if (!querySchema.equals(StructType(fields))) { + throw new MalformedCarbonCommandException(s"Schema of table ${ sink.getTableName } " + --- End diff -- you can move the first half to the next line so that we can avoid string concatenation here. The same with line#58 --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215974054 --- Diff: pom.xml --- @@ -217,6 +217,12 @@ <version>${spark.version}</version> <scope>${spark.deps.scope}</scope> </dependency> + <dependency> --- End diff -- Why is it needed for whole project scope? I think it will only be used in streaming related modules --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215973078 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -82,24 +91,37 @@ case class CarbonCreateStreamCommand( sourceTable = sourceTable, sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession), query = query, - streamDf = Dataset.ofRows(sparkSession, streamLp), - options = new StreamingOption(optionMap) + streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, df.logicalPlan)), + options = new StreamingOption(newMap.toMap) ) Seq(Row(streamName, jobId, "RUNNING")) } - private def prepareStreamingRelation( + /** + * Create a dataframe from source table of logicalRelation + * @param sparkSession + * @param logicalRelation + * @return sourceTable and its stream dataFrame + */ + private def prepareDataFrame( sparkSession: SparkSession, - r: LogicalRelation): (CarbonTable, StreamingRelation) = { - val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = { + val sourceTable = logicalRelation.relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties val format = tblProperty.get("format") if (format == null) { throw new MalformedCarbonCommandException("Streaming from carbon file is not supported") } - val streamReader = sparkSession.readStream - .schema(getSparkSchema(sourceTable)) - .format(format) + val streamReader = if (format != "kafka") { --- End diff -- use 'equals' instead of '=' --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215970886 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -1182,6 +1182,15 @@ private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) { } } + /** + * Return the format value defined in table properties + * @return String as per table properties, null if not defined + */ + public String getFormat() { + return getTableInfo().getFactTable().getTableProperties() + .get("format"); --- End diff -- it seems this can be moved to the previous line --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215973495 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java --- @@ -316,6 +316,10 @@ private void appendBlockletToDataFile() throws IOException { } public BlockletMinMaxIndex getBatchMinMaxIndex() { + if (output == null) { + return StreamSegment.mergeBlockletMinMax( --- End diff -- no need for multiple lines here --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2695#discussion_r215971533 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -102,14 +107,22 @@ object StreamJobManager { } validateSourceTable(sourceTable) - validateSinkTable(streamDf.schema, sinkTable) + + // kafka source always have fixed schema, need to get actual schema + val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka") --- End diff -- for string equality, better to use equals --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/162/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/330/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2695 Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8401/ --- |
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:
https://github.com/apache/carbondata/pull/2695 retest this please --- |
Free forum by Nabble | Edit this page |