[GitHub] carbondata pull request #1867: [CARBONDATA-2055]Support integrating Stream t...

classic Classic list List threaded Threaded
94 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3452/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3627/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2387/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3469/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    retest sdv please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3476/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    retest sdv please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3481/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3653/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2414/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    retest sdv please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3490/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3491/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    retest sdv please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1867: [CARBONDATA-2055][Streaming][WIP]Support integrating...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on the issue:

    https://github.com/apache/carbondata/pull/1867
 
    anyone can help to review this pr?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1867: [CARBONDATA-2055][Streaming][WIP]Support inte...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1867#discussion_r170854883
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.commons.lang3.StringUtils
    +
    +import org.apache.carbondata.streaming.CarbonStreamException
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter
    +
    +/**
    + * Create CarbonStreamSparkStreamingWriter for stream table
    --- End diff --
   
    change to `Create [[CarbonStreamSparkStreamingWriter]] for stream table`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1867: [CARBONDATA-2055][Streaming][WIP]Support inte...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1867#discussion_r170856116
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import java.util
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql.DataFrame
    +import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
    +import org.apache.spark.sql.execution.streaming.Sink
    +import org.apache.spark.sql.SaveMode
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.streaming.Time
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +
    +class CarbonStreamSparkStreamingWriter {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  private var isInitialize: Boolean = false
    +
    +  private var lock: ICarbonLock = null
    +  private var carbonTable: CarbonTable = null
    +  private var configuration: Configuration = null
    +  private var carbonAppendableStreamSink: Sink = null
    +  private val sparkSession: SparkSession = SparkSession.builder().getOrCreate()
    +
    +  def this(carbonTable: CarbonTable, configuration: Configuration) {
    +    this()
    +    this.carbonTable = carbonTable
    +    this.configuration = configuration
    +    this.option("dbName", carbonTable.getDatabaseName)
    +    this.option("tableName", carbonTable.getTableName)
    +  }
    +
    +  /**
    +   * Acquired the lock for stream table
    +   */
    +  def lockStreamTable(): Unit = {
    +    lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
    +      LockUsage.STREAMING_LOCK)
    +    if (lock.lockWithRetries()) {
    +      LOGGER.info("Acquired the lock for stream table: " +
    +                  carbonTable.getDatabaseName + "." +
    +                  carbonTable.getTableName)
    +    } else {
    +      LOGGER.error("Not able to acquire the lock for stream table:" +
    +                   carbonTable.getDatabaseName + "." + carbonTable.getTableName)
    +      throw new InterruptedException(
    +        "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
    +        carbonTable.getTableName)
    +    }
    +  }
    +
    +  /**
    +   * unlock for stream table
    +   */
    +  def unLockStreamTable(): Unit = {
    +    if (null != lock) {
    +      lock.unlock()
    +      LOGGER.info("unlock for stream table: " +
    +                  carbonTable.getDatabaseName + "." +
    +                  carbonTable.getTableName)
    +    }
    +  }
    +
    +  def initialize(): Unit = {
    +    carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
    +      sparkSession,
    +      configuration,
    +      carbonTable,
    +      extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
    +
    +    lockStreamTable()
    +
    +    isInitialize = true
    +  }
    +
    +  def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
    +    if (!isInitialize) {
    +      initialize()
    +    }
    +    carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
    +  }
    +
    +  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
    +  private var mode: SaveMode = SaveMode.ErrorIfExists
    +
    +  /**
    +   * Specifies the behavior when data or table already exists. Options include:
    +   *   - `SaveMode.Overwrite`: overwrite the existing data.
    +   *   - `SaveMode.Append`: append the data.
    +   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
    +   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
    +   */
    +  def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
    +    if (mode == SaveMode.ErrorIfExists) {
    +      mode = saveMode
    +    }
    +    this
    +  }
    +
    +  /**
    +   * Specifies the behavior when data or table already exists. Options include:
    +   *   - `SaveMode.Overwrite`: overwrite the existing data.
    --- End diff --
   
    saveMode parameter is String, change the description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1867: [CARBONDATA-2055][Streaming][WIP]Support inte...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1867#discussion_r170856731
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.commons.lang3.StringUtils
    +
    +import org.apache.carbondata.streaming.CarbonStreamException
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter
    +
    +/**
    + * Create CarbonStreamSparkStreamingWriter for stream table
    + * when integrate with Spark Streaming
    + */
    +object CarbonSparkStreamFactory {
    +
    +  def getStreamSparkStreamWriter(
    +    dbNameStr: String,
    +    tableName: String): CarbonStreamSparkStreamingWriter =
    +    synchronized {
    +    val dbName = if (StringUtils.isEmpty(dbNameStr)) "default" else dbNameStr
    +    val key = dbName + "." + tableName
    +    if (CarbonStreamSparkStreaming.getTableMap.containsKey(key)) {
    +      CarbonStreamSparkStreaming.getTableMap.get(key)
    +    } else {
    +      if (StringUtils.isEmpty(tableName) || tableName.contains(" ")) {
    +        throw new CarbonStreamException("Table creation failed. " +
    +                                        "Table name must not be blank or " +
    +                                        "cannot contain blank space")
    +      }
    +      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
    +        tableName)(SparkSession.builder().getOrCreate())
    --- End diff --
   
    build two SparkSession repeatedly in line 47 and line 53, build one instead


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1867: [CARBONDATA-2055][Streaming][WIP]Support inte...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1867#discussion_r170958463
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamFactory.scala ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.commons.lang3.StringUtils
    +
    +import org.apache.carbondata.streaming.CarbonStreamException
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming
    +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter
    +
    +/**
    + * Create CarbonStreamSparkStreamingWriter for stream table
    --- End diff --
   
    Done


---
12345