[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

classic Classic list List threaded Threaded
74 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
GitHub user jackylk opened a pull request:

    https://github.com/apache/carbondata/pull/2695

    [CARBONDATA-2919] Support ingest from Kafka in StreamSQL

    WIP
   
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
   
     - [ ] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jackylk/incubator-carbondata kafka

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2695.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2695
   
----
commit 19dd6c641b9faa03da8b7a713873f398cd3b97d4
Author: Ajith <ajith2489@...>
Date:   2018-07-12T03:47:22Z

    [CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL
   
    Modification in this PR:
    1.Pass source table properties to streamReader.load()
    2.Do not pass schema when sparkSession.readStream
    3.Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema )
    4.Extract the dataframe from kafka source which contain actual data schema @ writeStream
   
    This closes #2495

commit c2148e7e4fb6d7d5917a674075a7afdf941538c2
Author: Jacky Li <jacky.likun@...>
Date:   2018-09-05T17:45:40Z

    wip

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/100/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/268/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8338/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215484019
 
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.{File, PrintWriter}
    +import java.net.ServerSocket
    +
    +import org.apache.spark.sql.{CarbonEnv, SparkSession}
    +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
    +
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.examples.util.ExampleUtils
    +import org.apache.carbondata.streaming.parser.CarbonStreamParser
    +
    +// scalastyle:off println
    +object StreamSQLExample {
    +  def main(args: Array[String]) {
    +
    +    // setup paths
    +    val rootPath = new File(this.getClass.getResource("/").getPath
    +                            + "../../../..").getCanonicalPath
    +
    +    val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
    +    val streamTableName = s"stream_table"
    +
    +    val requireCreateTable = true
    +
    +    if (requireCreateTable) {
    +      // drop table if exists previously
    +      spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
    +      spark.sql("DROP TABLE IF EXISTS source")
    +
    +      // Create target carbon table and populate with initial data
    +      spark.sql(
    +        s"""
    +           | CREATE TABLE $streamTableName(
    +           | id INT,
    +           | name STRING,
    +           | city STRING,
    +           | salary FLOAT
    +           | )
    +           | STORED AS carbondata
    +           | TBLPROPERTIES(
    +           | 'streaming'='true', 'sort_columns'='name')
    +          """.stripMargin)
    +
    +      // batch load
    +      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
    +      spark.sql(
    +        s"""
    +           | LOAD DATA LOCAL INPATH '$path'
    +           | INTO TABLE $streamTableName
    +           | OPTIONS('HEADER'='true')
    +         """.stripMargin)
    +    }
    +
    +    spark.sql(
    +      """
    +        | CREATE TABLE source (
    +        | id INT,
    +        | name STRING,
    +        | city STRING,
    +        | salary FLOAT
    +        | )
    +        | STORED AS carbondata
    +        | TBLPROPERTIES(
    +        | 'streaming'='source',
    +        | 'format'='kafka',
    +        | 'kafka.bootstrap.servers'='localhost:9092',
    +        | 'subscribe'='test')
    +      """.stripMargin)
    +
    +    spark.sql(
    +      s"""
    +        | CREATE STREAM ingest ON TABLE $streamTableName
    +        | STMPROPERTIES(
    +        | 'trigger' = 'ProcessingTime',
    +        | 'interval' = '3 seconds',
    +        | 'carbon.stream.parser'='org.apache.carbondata.streaming.parser.CSVStreamParserImp',
    +        | 'BAD_RECORDS_ACTION'='force')
    +        | AS SELECT * FROM source
    +      """.stripMargin)
    +
    +    (1 to 1000).foreach { i =>
    +      spark.sql(s"select * from $streamTableName")
    +        .show(100, truncate = false)
    +      Thread.sleep(5000)
    +    }
    +
    +    spark.stop()
    +    System.out.println("streaming finished")
    +  }
    +
    +  def showTableCount(spark: SparkSession, tableName: String): Thread = {
    --- End diff --
   
    please remove unused code


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215483849
 
    --- Diff: examples/spark2/pom.xml ---
    @@ -53,6 +53,11 @@
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-sql_${scala.binary.version}</artifactId>
         </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
    +      <version>2.2.1</version>
    --- End diff --
   
    please use spark.version property


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215507635
 
    --- Diff: examples/spark2/pom.xml ---
    @@ -53,6 +53,11 @@
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-sql_${scala.binary.version}</artifactId>
         </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
    +      <version>2.2.1</version>
    --- End diff --
   
    ok, fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/111/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/279/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8349/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215972515
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -52,19 +52,24 @@ object StreamJobManager {
         }
       }
     
    -  private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
    +  private def validateSinkTable(validateQuerySchema: Boolean,
    +                                querySchema: StructType, sink: CarbonTable): Unit = {
         if (!sink.isStreamingSink) {
           throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
                                                     "streaming sink table " +
                                                     "('streaming' tblproperty is not 'sink' or 'true')")
         }
    -    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
    -      StructField(column.getColName,
    -        CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
    -    }
    -    if (!querySchema.equals(StructType(fields))) {
    -      throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
    -                                                s"does not match query output")
    +    // TODO: validate query schema against sink in kafka (we cannot get schema directly)
    +    if (validateQuerySchema) {
    +      val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
    +        StructField(
    +          column.getColName,
    +          CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
    +      }
    +      if (!querySchema.equals(StructType(fields))) {
    +        throw new MalformedCarbonCommandException(s"Schema of table ${ sink.getTableName } " +
    --- End diff --
   
    you can move the first half to the next line so that we can avoid string concatenation here.
    The same with line#58


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215974054
 
    --- Diff: pom.xml ---
    @@ -217,6 +217,12 @@
             <version>${spark.version}</version>
             <scope>${spark.deps.scope}</scope>
           </dependency>
    +      <dependency>
    --- End diff --
   
    Why is it needed for whole project scope? I think it will only be used in streaming related modules


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215973078
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---
    @@ -82,24 +91,37 @@ case class CarbonCreateStreamCommand(
           sourceTable = sourceTable,
           sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
           query = query,
    -      streamDf = Dataset.ofRows(sparkSession, streamLp),
    -      options = new StreamingOption(optionMap)
    +      streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, df.logicalPlan)),
    +      options = new StreamingOption(newMap.toMap)
         )
         Seq(Row(streamName, jobId, "RUNNING"))
       }
     
    -  private def prepareStreamingRelation(
    +  /**
    +   * Create a dataframe from source table of logicalRelation
    +   * @param sparkSession
    +   * @param logicalRelation
    +   * @return sourceTable and its stream dataFrame
    +   */
    +  private def prepareDataFrame(
           sparkSession: SparkSession,
    -      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
    -    val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +      logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
    +    val sourceTable = logicalRelation.relation
    +      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
         val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
         val format = tblProperty.get("format")
         if (format == null) {
           throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
         }
    -    val streamReader = sparkSession.readStream
    -      .schema(getSparkSchema(sourceTable))
    -      .format(format)
    +    val streamReader = if (format != "kafka") {
    --- End diff --
   
    use 'equals' instead of '='


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215970886
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -1182,6 +1182,15 @@ private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) {
         }
       }
     
    +  /**
    +   * Return the format value defined in table properties
    +   * @return String as per table properties, null if not defined
    +   */
    +  public String getFormat() {
    +    return getTableInfo().getFactTable().getTableProperties()
    +            .get("format");
    --- End diff --
   
    it seems this can be moved to the previous line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215973495
 
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---
    @@ -316,6 +316,10 @@ private void appendBlockletToDataFile() throws IOException {
       }
     
       public BlockletMinMaxIndex getBatchMinMaxIndex() {
    +    if (output == null) {
    +      return StreamSegment.mergeBlockletMinMax(
    --- End diff --
   
    no need for multiple lines here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2695: [CARBONDATA-2919] Support ingest from Kafka i...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2695#discussion_r215971533
 
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -102,14 +107,22 @@ object StreamJobManager {
         }
     
         validateSourceTable(sourceTable)
    -    validateSinkTable(streamDf.schema, sinkTable)
    +
    +    // kafka source always have fixed schema, need to get actual schema
    +    val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
    --- End diff --
   
    for string equality, better to use equals


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/162/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/330/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    Build Failed  with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8401/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2695: [CARBONDATA-2919] Support ingest from Kafka in Strea...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user chenliang613 commented on the issue:

    https://github.com/apache/carbondata/pull/2695
 
    retest this please


---
1234