[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/1566

    [CARBONDATA-1586][Streaming] Support handoff from row format to columnar format

    1. configuration
    carbon.handoff.size =>carbon.streaming.segment.max.size
    2. SQL command
    alter table <table_name> compact 'streaming'
    3. refactory CompactionType
   
     - [x] Any interfaces changed?
     yes, alter table compact 'streaming'
     - [x] Any backward compatibility impacted?
     no
     - [x] Document update required?
    yes
     - [x] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
             yes
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
              no
            - Any additional information to help reviewers in testing this change.
           
     - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
    NA


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata streaming_handoff

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1566
   
----
commit f3537b4bb68e7987c2dbd19800e40b865255eeb7
Author: QiangCai <[hidden email]>
Date:   2017-11-24T17:54:41Z

    support handoff streaming segment to columnar segment

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1862/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1437/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1440/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1441/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1863/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153092758
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---
    @@ -64,6 +64,10 @@ public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterato
         this.destinationSegProperties = destinationSegProperties;
       }
     
    +  public RawResultIterator() {
    --- End diff --
   
    add comment for this function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153093237
 
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---
    @@ -342,7 +349,11 @@ private boolean nextRow() throws IOException {
                 input.nextRow();
                 scanMore = false;
               } else {
    -            readRowFromStream();
    +            if (useRawRow) {
    +              readRawRowFromStream();
    --- End diff --
   
    add comment to describe this is for `handoff`, which does not require decode raw row


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153093394
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---
    @@ -116,7 +116,7 @@ class CarbonMergerRDD[K, V](
     
             // During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
             // one segment, so max cardinality will be calculated from first block of segment
    -        if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
    +        if(CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
    --- End diff --
   
    add space after `if`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153093578
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import org.apache.spark.sql.SQLContext
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
    +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus,
    +SegmentStatusManager}
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil
    +import org.apache.carbondata.spark.HandoffResultImpl
    +import org.apache.carbondata.spark.util.CommonUtil
    +
    +object StreamHandoffUtil {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  /**
    +   * start new thread to execute stream segment handoff
    +   */
    +  def startStreamingHandoffThread(
    --- End diff --
   
    Please move it to HandOffRDD file


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153093767
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import org.apache.spark.sql.SQLContext
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
    +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus,
    +SegmentStatusManager}
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil
    +import org.apache.carbondata.spark.HandoffResultImpl
    +import org.apache.carbondata.spark.util.CommonUtil
    +
    +object StreamHandoffUtil {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  /**
    +   * start new thread to execute stream segment handoff
    +   */
    +  def startStreamingHandoffThread(
    +      carbonLoadModel: CarbonLoadModel,
    +      sqlContext: SQLContext,
    +      storeLocation: String
    +  ): Unit = {
    +    // start a new thread to execute streaming segment handoff
    +    val handoffThread = new Thread() {
    +      override def run(): Unit = {
    +        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val identifier = carbonTable.getAbsoluteTableIdentifier
    +        val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
    +        var continueHandoff = false
    +        // require handoff lock on table
    +        val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
    +        try {
    +          if (lock.lockWithRetries()) {
    +            LOGGER.info("Acquired the handoff lock for table" +
    +                        s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
    +            // handoff streaming segment one by one
    +            do {
    +              val segmentStatusManager = new SegmentStatusManager(identifier)
    +              var loadMetadataDetails: Array[LoadMetadataDetails] = null
    +              val statusLock = segmentStatusManager.getTableStatusLock
    +              try {
    +                if (statusLock.lockWithRetries()) {
    --- End diff --
   
    add comment to describe what this lock is for


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153093812
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala ---
    @@ -0,0 +1,236 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import org.apache.spark.sql.SQLContext
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
    +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus,
    +SegmentStatusManager}
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil
    +import org.apache.carbondata.spark.HandoffResultImpl
    +import org.apache.carbondata.spark.util.CommonUtil
    +
    +object StreamHandoffUtil {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  /**
    +   * start new thread to execute stream segment handoff
    +   */
    +  def startStreamingHandoffThread(
    +      carbonLoadModel: CarbonLoadModel,
    +      sqlContext: SQLContext,
    +      storeLocation: String
    +  ): Unit = {
    +    // start a new thread to execute streaming segment handoff
    +    val handoffThread = new Thread() {
    +      override def run(): Unit = {
    +        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +        val identifier = carbonTable.getAbsoluteTableIdentifier
    +        val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
    +        var continueHandoff = false
    +        // require handoff lock on table
    +        val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
    +        try {
    +          if (lock.lockWithRetries()) {
    +            LOGGER.info("Acquired the handoff lock for table" +
    +                        s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
    +            // handoff streaming segment one by one
    +            do {
    +              val segmentStatusManager = new SegmentStatusManager(identifier)
    +              var loadMetadataDetails: Array[LoadMetadataDetails] = null
    +              val statusLock = segmentStatusManager.getTableStatusLock
    +              try {
    +                if (statusLock.lockWithRetries()) {
    +                  loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
    +                    tablePath.getMetadataDirectoryPath)
    +                }
    +              } finally {
    +                if (null != statusLock) {
    +                  statusLock.unlock()
    +                }
    +              }
    +              if (null != loadMetadataDetails) {
    +                val streamSegments =
    +                  loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
    +
    +                continueHandoff = streamSegments.length > 0
    +                if (continueHandoff) {
    +                  // handoff a streaming segment
    +                  val loadMetadataDetail = streamSegments(0)
    +                  executeStreamingHandoff(
    +                    carbonLoadModel,
    +                    sqlContext,
    +                    storeLocation,
    +                    loadMetadataDetail.getLoadName
    +                  )
    +                }
    +              } else {
    +                continueHandoff = false
    +              }
    +            } while (continueHandoff)
    +          }
    +        } finally {
    +          if (null != lock) {
    +            lock.unlock()
    +          }
    +        }
    +      }
    +    }
    +    handoffThread.start()
    +  }
    +
    +  /**
    +   * invoke StreamHandoffRDD to handoff streaming segment one bye one
    --- End diff --
   
    it is one only


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153094199
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---
    @@ -849,7 +892,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
              | salary FLOAT
              | )
              | STORED BY 'carbondata'
    -         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
    --- End diff --
   
    I think it is not required to modify this


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153094639
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.Date
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +
    +import org.apache.carbondata.core.datastore.block.SegmentProperties
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
    +import org.apache.carbondata.core.util.CarbonUtil
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
    +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
    +import org.apache.carbondata.spark.HandoffResult
    +import org.apache.carbondata.spark.rdd.CarbonRDD
    +
    +/**
    + * partition of the handoff segment
    + */
    +class HandoffPartition(
    +    val rddId: Int,
    +    val idx: Int,
    +    @transient val inputSplit: CarbonInputSplit
    +) extends Partition {
    +
    +  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +/**
    + * package the record reader of the handoff segment to RawResultIterator
    + */
    +class StreamingRawResultIterator(
    +    recordReader: CarbonStreamRecordReader
    +) extends RawResultIterator {
    +
    +  override def hasNext: Boolean = {
    +    recordReader.nextKeyValue()
    +  }
    +
    +  override def next(): Array[Object] = {
    +    recordReader
    +      .getCurrentValue
    +      .asInstanceOf[GenericInternalRow]
    +      .values
    +      .asInstanceOf[Array[Object]]
    +  }
    +}
    +
    +/**
    + * execute streaming segment handoff
    + */
    +class StreamHandoffRDD[K, V](
    +    sc: SparkContext,
    +    result: HandoffResult[K, V],
    +    carbonLoadModel: CarbonLoadModel,
    +    handOffSegmentId: String
    +) extends CarbonRDD[(K, V)](sc, Nil) {
    +
    +  private val jobTrackerId: String = {
    +    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    +    formatter.format(new Date())
    +  }
    +
    +  override def internalCompute(split: Partition,
    --- End diff --
   
    move `split` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153094776
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.Date
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +
    +import org.apache.carbondata.core.datastore.block.SegmentProperties
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
    +import org.apache.carbondata.core.util.CarbonUtil
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
    +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
    +import org.apache.carbondata.spark.HandoffResult
    +import org.apache.carbondata.spark.rdd.CarbonRDD
    +
    +/**
    + * partition of the handoff segment
    + */
    +class HandoffPartition(
    +    val rddId: Int,
    +    val idx: Int,
    +    @transient val inputSplit: CarbonInputSplit
    +) extends Partition {
    +
    +  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +/**
    + * package the record reader of the handoff segment to RawResultIterator
    + */
    +class StreamingRawResultIterator(
    +    recordReader: CarbonStreamRecordReader
    +) extends RawResultIterator {
    +
    +  override def hasNext: Boolean = {
    +    recordReader.nextKeyValue()
    +  }
    +
    +  override def next(): Array[Object] = {
    +    recordReader
    +      .getCurrentValue
    +      .asInstanceOf[GenericInternalRow]
    +      .values
    +      .asInstanceOf[Array[Object]]
    +  }
    +}
    +
    +/**
    + * execute streaming segment handoff
    + */
    +class StreamHandoffRDD[K, V](
    +    sc: SparkContext,
    +    result: HandoffResult[K, V],
    +    carbonLoadModel: CarbonLoadModel,
    +    handOffSegmentId: String
    +) extends CarbonRDD[(K, V)](sc, Nil) {
    +
    +  private val jobTrackerId: String = {
    +    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    +    formatter.format(new Date())
    +  }
    +
    +  override def internalCompute(split: Partition,
    +      context: TaskContext): Iterator[(K, V)] = {
    +    carbonLoadModel.setPartitionId("0")
    +    carbonLoadModel.setTaskNo("" + split.index)
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val iteratorList = prepareInputIterator(split, carbonTable)
    +    val processor = prepareHandoffProcessor(carbonTable)
    +    val status = processor.execute(iteratorList)
    +
    +    new Iterator[(K, V)] {
    +      private var finished = false
    +
    +      override def hasNext: Boolean = {
    +        !finished
    +      }
    +
    +      override def next(): (K, V) = {
    +        finished = true
    +        result.getKey("" + split.index, status)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * prepare input iterator by basing CarbonStreamRecordReader
    +   */
    +  private def prepareInputIterator(
    +      split: Partition,
    +      carbonTable: CarbonTable
    +  ): util.ArrayList[RawResultIterator] = {
    +    val inputSplit = split.asInstanceOf[HandoffPartition].split.value
    +    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
    +    val hadoopConf = new Configuration()
    +    CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
    +    CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
    +    CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
    +    val projection = new CarbonProjection
    +    val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
    +    (0 until dataFields.size()).foreach { index =>
    +      projection.addColumn(dataFields.get(index).getColName)
    +    }
    +    CarbonTableInputFormat.setColumnProjection(hadoopConf, projection)
    +    val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
    +    val format = new CarbonTableInputFormat[Array[Object]]()
    +    val model = format.getQueryModel(inputSplit, attemptContext)
    +    val inputFormat = new CarbonStreamInputFormat
    +    val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    +      .asInstanceOf[CarbonStreamRecordReader]
    +    streamReader.setVectorReader(false)
    +    streamReader.setQueryModel(model)
    +    streamReader.setUseRawRow(true)
    +    streamReader.initialize(inputSplit, attemptContext)
    +    val iteratorList = new util.ArrayList[RawResultIterator](1)
    +    iteratorList.add(new StreamingRawResultIterator(streamReader))
    +    iteratorList
    +  }
    +
    +  private def prepareHandoffProcessor(
    +      carbonTable: CarbonTable
    +  ): CompactionResultSortProcessor = {
    +    val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
    +      carbonTable.getDimensionByTableName(carbonTable.getTableName),
    +      carbonTable.getMeasureByTableName(carbonTable.getTableName))
    +    val dimLensWithComplex = new Array[Int](wrapperColumnSchemaList.size())
    +    for (i <- 0 until dimLensWithComplex.length) {
    --- End diff --
   
    use dimLensWithComplex.length.zipWithIndex


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153094868
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---
    @@ -0,0 +1,193 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.Date
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +
    +import org.apache.carbondata.core.datastore.block.SegmentProperties
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
    +import org.apache.carbondata.core.util.CarbonUtil
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
    +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
    +import org.apache.carbondata.spark.HandoffResult
    +import org.apache.carbondata.spark.rdd.CarbonRDD
    +
    +/**
    + * partition of the handoff segment
    + */
    +class HandoffPartition(
    +    val rddId: Int,
    +    val idx: Int,
    +    @transient val inputSplit: CarbonInputSplit
    +) extends Partition {
    +
    +  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
    +
    +  override val index: Int = idx
    +
    +  override def hashCode(): Int = 41 * (41 + rddId) + idx
    +}
    +
    +/**
    + * package the record reader of the handoff segment to RawResultIterator
    + */
    +class StreamingRawResultIterator(
    +    recordReader: CarbonStreamRecordReader
    +) extends RawResultIterator {
    +
    +  override def hasNext: Boolean = {
    +    recordReader.nextKeyValue()
    +  }
    +
    +  override def next(): Array[Object] = {
    +    recordReader
    +      .getCurrentValue
    +      .asInstanceOf[GenericInternalRow]
    +      .values
    +      .asInstanceOf[Array[Object]]
    +  }
    +}
    +
    +/**
    + * execute streaming segment handoff
    + */
    +class StreamHandoffRDD[K, V](
    +    sc: SparkContext,
    +    result: HandoffResult[K, V],
    +    carbonLoadModel: CarbonLoadModel,
    +    handOffSegmentId: String
    +) extends CarbonRDD[(K, V)](sc, Nil) {
    +
    +  private val jobTrackerId: String = {
    +    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    +    formatter.format(new Date())
    +  }
    +
    +  override def internalCompute(split: Partition,
    +      context: TaskContext): Iterator[(K, V)] = {
    +    carbonLoadModel.setPartitionId("0")
    +    carbonLoadModel.setTaskNo("" + split.index)
    +    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val iteratorList = prepareInputIterator(split, carbonTable)
    --- End diff --
   
    add comment to describe it is using raw row


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1566: [CARBONDATA-1586][Streaming] Support handoff ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1566#discussion_r153094993
 
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---
    @@ -196,9 +196,16 @@ private void deleteTempStoreLocation() {
        */
       private void processResult(List<RawResultIterator> resultIteratorList) throws Exception {
         for (RawResultIterator resultIterator : resultIteratorList) {
    -      while (resultIterator.hasNext()) {
    -        addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
    -        isRecordFound = true;
    +      if (CompactionType.STREAMING == compactionType) {
    +        while (resultIterator.hasNext()) {
    +          addRowForSorting(resultIterator.next());
    --- End diff --
   
    add comment to describe it is using raw row


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1474/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1566: [CARBONDATA-1586][Streaming] Support handoff from ro...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1566
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1893/



---
12