[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

classic Classic list List threaded Threaded
52 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/1470

    [CARBONDATA-1572] Support streaming ingest and query

    1. row format writer and support to append batch data
   
    2. support StreamSinkProvider and append batch data to row format file
   
    3. row format reader and support to split row format file to small blocks
   
    4. query with streaming row format file.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata streaming_all

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1470
   
----
commit 702d4564a64574f50bb5e9b9e431b9830b1e525f
Author: QiangCai <[hidden email]>
Date:   2017-10-18T03:13:00Z

    support streaming ingest and query

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/1470
 
    Please give following description:
    ```
     - [X] Any interfaces changed?
     No
   
     - [X] Any backward compatibility impacted?
     No
   
     - [X] Document update required?
    No
   
     - [X] Testing done
    No new testcase is required
   
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
    NA
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/828/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1462/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149064920
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    --- End diff --
   
    add space after `for`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149065241
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    --- End diff --
   
    use `splits.asScala.foreach` instead of `for` loop


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149065658
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    --- End diff --
   
    It is better to use enum instead of string compare


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149066956
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    --- End diff --
   
    suggest rename to `distributeBatchSplits`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149067107
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    +    if (streamSplits.isEmpty) {
    +      batchPartitions
    +    } else {
    +      val index = batchPartitions.length
    +      val streamPartitions: ArrayBuffer[Partition] =
    +        streamSplits.zipWithIndex.map { splitWithIndex =>
    +          val multiBlockSplit =
    +            new CarbonMultiBlockSplit(identifier,
    +              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
    +              splitWithIndex._1.getLocations)
    +          multiBlockSplit.setStream(true)
    +          new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
    +        }
    +      if (batchPartitions.isEmpty) {
    --- End diff --
   
    instead of this, you can do
    ```
    streamPartitions.appendAll(batchPartitions)
    streamPartitions.toArray
    ```


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149067739
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    +    if (streamSplits.isEmpty) {
    +      batchPartitions
    +    } else {
    +      val index = batchPartitions.length
    +      val streamPartitions: ArrayBuffer[Partition] =
    +        streamSplits.zipWithIndex.map { splitWithIndex =>
    +          val multiBlockSplit =
    +            new CarbonMultiBlockSplit(identifier,
    +              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
    +              splitWithIndex._1.getLocations)
    +          multiBlockSplit.setStream(true)
    --- End diff --
   
    I think you can set the same DATA_FILE_FORMAT enum in `multiBlockSplit`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149068014
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    +          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
    +          val inputFormat = new CarbonStreamInputFormat
    +          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    +            .asInstanceOf[CarbonStreamRecordReader]
    +          streamReader.setVectorReader(vectorReader)
    +          model.setStatisticsRecorder(
    +            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
    +          streamReader.setQueryModel(model)
    +          streamReader
    +        } else if (vectorReader) {
    --- End diff --
   
    It is better to use else instead of else if, so that in the else block it handles for columnar format


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070335
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    +          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
    +          val inputFormat = new CarbonStreamInputFormat
    +          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    +            .asInstanceOf[CarbonStreamRecordReader]
    +          streamReader.setVectorReader(vectorReader)
    +          model.setStatisticsRecorder(
    +            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
    +          streamReader.setQueryModel(model)
    --- End diff --
   
    It is better to put `model` in constructor of `CarbonStreamRecordReader`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070667
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -208,6 +211,31 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         }
       }
     
    +  override def createSink(sqlContext: SQLContext,
    --- End diff --
   
    please add description of this function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070798
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    --- End diff --
   
    Add some description of this code block


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149071231
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    +
    +  def createStreamTableSink(
    +      sparkSession: SparkSession,
    +      carbonTable: CarbonTable,
    +      parameters: Map[String, String]): Sink = {
    +      validateParameters(parameters)
    --- End diff --
   
    Incorrect identation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149071325
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    --- End diff --
   
    This is not used


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149072545
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    +
    +  def createStreamTableSink(
    +      sparkSession: SparkSession,
    +      carbonTable: CarbonTable,
    +      parameters: Map[String, String]): Sink = {
    +      validateParameters(parameters)
    +
    +    // prepare the stream segment
    +    val segmentId = getStreamSegmentId(carbonTable)
    +    // build load model
    +    val carbonLoadModel = buildCarbonLoadModelForStream(
    +      sparkSession,
    +      carbonTable,
    +      parameters,
    +      segmentId)
    +    // start server if necessary
    +    val server = startDictionaryServer(
    --- End diff --
   
    Should add try-catch block to shutdown the server if anything failed before this function  returns


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149072656
 
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    --- End diff --
   
    Add description of this class


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149077497
 
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    +    sparkSession: SparkSession,
    +    val carbonTable: CarbonTable,
    +    var currentSegmentId: String,
    +    parameters: Map[String, String],
    +    carbonLoadModel: CarbonLoadModel,
    +    sever: Option[DictionaryServer]) extends Sink {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val carbonTablePath = CarbonStorePath
    +    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +  private val fileLogPath = carbonTablePath.getStreamingLogDir
    +  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
    +  // prepare configuration
    +  private val hadoopConf = {
    +    val conf = sparkSession.sessionState.newHadoopConf()
    +    CarbonStreamOutputFormat.setCarbonLoadModel(conf, carbonLoadModel)
    +    // put all parameters into hadoopConf
    +    parameters.foreach { entry =>
    +      conf.set(entry._1, entry._2)
    +    }
    +    conf
    +  }
    +
    +  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    +    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
    +      LOGGER.info(s"Skipping already committed batch $batchId")
    +    } else {
    +      checkOrHandOffSegment()
    +
    +      val committer = FileCommitProtocol.instantiate(
    +        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
    +        jobId = batchId.toString,
    +        outputPath = fileLogPath,
    +        isAppend = false)
    +
    +      committer match {
    +        case manifestCommitter: ManifestFileCommitProtocol =>
    +          manifestCommitter.setupManifestOptions(fileLog, batchId)
    +        case _ => // Do nothing
    +      }
    +
    +      CarbonStreamProcessor.writeDataFileJob(
    --- End diff --
   
    I think it is not required to create `CarbonStreamProcessor`, why not move this function here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149078206
 
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    +    sparkSession: SparkSession,
    +    val carbonTable: CarbonTable,
    +    var currentSegmentId: String,
    +    parameters: Map[String, String],
    +    carbonLoadModel: CarbonLoadModel,
    +    sever: Option[DictionaryServer]) extends Sink {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val carbonTablePath = CarbonStorePath
    +    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +  private val fileLogPath = carbonTablePath.getStreamingLogDir
    +  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
    +  // prepare configuration
    +  private val hadoopConf = {
    +    val conf = sparkSession.sessionState.newHadoopConf()
    +    CarbonStreamOutputFormat.setCarbonLoadModel(conf, carbonLoadModel)
    +    // put all parameters into hadoopConf
    +    parameters.foreach { entry =>
    +      conf.set(entry._1, entry._2)
    +    }
    +    conf
    +  }
    +
    +  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    +    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
    +      LOGGER.info(s"Skipping already committed batch $batchId")
    +    } else {
    +      checkOrHandOffSegment()
    +
    +      val committer = FileCommitProtocol.instantiate(
    +        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
    +        jobId = batchId.toString,
    +        outputPath = fileLogPath,
    +        isAppend = false)
    +
    +      committer match {
    +        case manifestCommitter: ManifestFileCommitProtocol =>
    +          manifestCommitter.setupManifestOptions(fileLog, batchId)
    +        case _ => // Do nothing
    +      }
    +
    +      CarbonStreamProcessor.writeDataFileJob(
    +        sparkSession,
    +        carbonTable,
    +        parameters,
    +        batchId,
    +        currentSegmentId,
    +        data.queryExecution,
    +        committer,
    +        hadoopConf,
    +        sever)
    +    }
    +  }
    +
    +  // if the directory size of current segment beyond the threshold, hand off new segment
    +  private def checkOrHandOffSegment(): Unit = {
    +    val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
    +    val fileType = FileFactory.getFileType(segmentDir)
    +    if (StreamSegmentManager.STREAM_SEGMENT_MAX_SIZE <= FileFactory.getDirectorySize(segmentDir)) {
    --- End diff --
   
    Can we make use of metadata instead of checking file system for every batch?


---
123