GitHub user QiangCai opened a pull request:
https://github.com/apache/carbondata/pull/1566 [CARBONDATA-1586][Streaming] Support handoff from row format to columnar format 1. configuration carbon.handoff.size =>carbon.streaming.segment.max.size 2. SQL command alter table <table_name> compact 'streaming' 3. refactory CompactionType - [x] Any interfaces changed? yes, alter table compact 'streaming' - [x] Any backward compatibility impacted? no - [x] Document update required? yes - [x] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? yes - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. no - Any additional information to help reviewers in testing this change. - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/QiangCai/carbondata streaming_handoff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1566 ---- commit f3537b4bb68e7987c2dbd19800e40b865255eeb7 Author: QiangCai <[hidden email]> Date: 2017-11-24T17:54:41Z support handoff streaming segment to columnar segment ---- --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1566 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1862/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1566 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1437/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on the issue:
https://github.com/apache/carbondata/pull/1566 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1566 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1440/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1566 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1441/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1566 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1863/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153092758 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -64,6 +64,10 @@ public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterato this.destinationSegProperties = destinationSegProperties; } + public RawResultIterator() { --- End diff -- add comment for this function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093237 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java --- @@ -342,7 +349,11 @@ private boolean nextRow() throws IOException { input.nextRow(); scanMore = false; } else { - readRowFromStream(); + if (useRawRow) { + readRawRowFromStream(); --- End diff -- add comment to describe this is for `handoff`, which does not require decode raw row --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093394 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -116,7 +116,7 @@ class CarbonMergerRDD[K, V]( // During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to // one segment, so max cardinality will be calculated from first block of segment - if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) { + if(CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) { --- End diff -- add space after `if` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093578 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import org.apache.spark.sql.SQLContext + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, +SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.HandoffResultImpl +import org.apache.carbondata.spark.util.CommonUtil + +object StreamHandoffUtil { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * start new thread to execute stream segment handoff + */ + def startStreamingHandoffThread( --- End diff -- Please move it to HandOffRDD file --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093767 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import org.apache.spark.sql.SQLContext + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, +SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.HandoffResultImpl +import org.apache.carbondata.spark.util.CommonUtil + +object StreamHandoffUtil { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * start new thread to execute stream segment handoff + */ + def startStreamingHandoffThread( + carbonLoadModel: CarbonLoadModel, + sqlContext: SQLContext, + storeLocation: String + ): Unit = { + // start a new thread to execute streaming segment handoff + val handoffThread = new Thread() { + override def run(): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = carbonTable.getAbsoluteTableIdentifier + val tablePath = CarbonStorePath.getCarbonTablePath(identifier) + var continueHandoff = false + // require handoff lock on table + val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the handoff lock for table" + + s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + // handoff streaming segment one by one + do { + val segmentStatusManager = new SegmentStatusManager(identifier) + var loadMetadataDetails: Array[LoadMetadataDetails] = null + val statusLock = segmentStatusManager.getTableStatusLock + try { + if (statusLock.lockWithRetries()) { --- End diff -- add comment to describe what this lock is for --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093812 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import org.apache.spark.sql.SQLContext + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, +SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.HandoffResultImpl +import org.apache.carbondata.spark.util.CommonUtil + +object StreamHandoffUtil { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * start new thread to execute stream segment handoff + */ + def startStreamingHandoffThread( + carbonLoadModel: CarbonLoadModel, + sqlContext: SQLContext, + storeLocation: String + ): Unit = { + // start a new thread to execute streaming segment handoff + val handoffThread = new Thread() { + override def run(): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = carbonTable.getAbsoluteTableIdentifier + val tablePath = CarbonStorePath.getCarbonTablePath(identifier) + var continueHandoff = false + // require handoff lock on table + val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the handoff lock for table" + + s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + // handoff streaming segment one by one + do { + val segmentStatusManager = new SegmentStatusManager(identifier) + var loadMetadataDetails: Array[LoadMetadataDetails] = null + val statusLock = segmentStatusManager.getTableStatusLock + try { + if (statusLock.lockWithRetries()) { + loadMetadataDetails = SegmentStatusManager.readLoadMetadata( + tablePath.getMetadataDirectoryPath) + } + } finally { + if (null != statusLock) { + statusLock.unlock() + } + } + if (null != loadMetadataDetails) { + val streamSegments = + loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH) + + continueHandoff = streamSegments.length > 0 + if (continueHandoff) { + // handoff a streaming segment + val loadMetadataDetail = streamSegments(0) + executeStreamingHandoff( + carbonLoadModel, + sqlContext, + storeLocation, + loadMetadataDetail.getLoadName + ) + } + } else { + continueHandoff = false + } + } while (continueHandoff) + } + } finally { + if (null != lock) { + lock.unlock() + } + } + } + } + handoffThread.start() + } + + /** + * invoke StreamHandoffRDD to handoff streaming segment one bye one --- End diff -- it is one only --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153094199 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala --- @@ -849,7 +892,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { | salary FLOAT | ) | STORED BY 'carbondata' - | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } --- End diff -- I think it is not required to modify this --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153094639 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.text.SimpleDateFormat +import java.util +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow + +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} +import org.apache.carbondata.spark.HandoffResult +import org.apache.carbondata.spark.rdd.CarbonRDD + +/** + * partition of the handoff segment + */ +class HandoffPartition( + val rddId: Int, + val idx: Int, + @transient val inputSplit: CarbonInputSplit +) extends Partition { + + val split = new SerializableWritable[CarbonInputSplit](inputSplit) + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * package the record reader of the handoff segment to RawResultIterator + */ +class StreamingRawResultIterator( + recordReader: CarbonStreamRecordReader +) extends RawResultIterator { + + override def hasNext: Boolean = { + recordReader.nextKeyValue() + } + + override def next(): Array[Object] = { + recordReader + .getCurrentValue + .asInstanceOf[GenericInternalRow] + .values + .asInstanceOf[Array[Object]] + } +} + +/** + * execute streaming segment handoff + */ +class StreamHandoffRDD[K, V]( + sc: SparkContext, + result: HandoffResult[K, V], + carbonLoadModel: CarbonLoadModel, + handOffSegmentId: String +) extends CarbonRDD[(K, V)](sc, Nil) { + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + override def internalCompute(split: Partition, --- End diff -- move `split` to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153094776 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.text.SimpleDateFormat +import java.util +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow + +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} +import org.apache.carbondata.spark.HandoffResult +import org.apache.carbondata.spark.rdd.CarbonRDD + +/** + * partition of the handoff segment + */ +class HandoffPartition( + val rddId: Int, + val idx: Int, + @transient val inputSplit: CarbonInputSplit +) extends Partition { + + val split = new SerializableWritable[CarbonInputSplit](inputSplit) + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * package the record reader of the handoff segment to RawResultIterator + */ +class StreamingRawResultIterator( + recordReader: CarbonStreamRecordReader +) extends RawResultIterator { + + override def hasNext: Boolean = { + recordReader.nextKeyValue() + } + + override def next(): Array[Object] = { + recordReader + .getCurrentValue + .asInstanceOf[GenericInternalRow] + .values + .asInstanceOf[Array[Object]] + } +} + +/** + * execute streaming segment handoff + */ +class StreamHandoffRDD[K, V]( + sc: SparkContext, + result: HandoffResult[K, V], + carbonLoadModel: CarbonLoadModel, + handOffSegmentId: String +) extends CarbonRDD[(K, V)](sc, Nil) { + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + override def internalCompute(split: Partition, + context: TaskContext): Iterator[(K, V)] = { + carbonLoadModel.setPartitionId("0") + carbonLoadModel.setTaskNo("" + split.index) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val iteratorList = prepareInputIterator(split, carbonTable) + val processor = prepareHandoffProcessor(carbonTable) + val status = processor.execute(iteratorList) + + new Iterator[(K, V)] { + private var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey("" + split.index, status) + } + } + } + + /** + * prepare input iterator by basing CarbonStreamRecordReader + */ + private def prepareInputIterator( + split: Partition, + carbonTable: CarbonTable + ): util.ArrayList[RawResultIterator] = { + val inputSplit = split.asInstanceOf[HandoffPartition].split.value + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val hadoopConf = new Configuration() + CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) + CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName) + CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) + val projection = new CarbonProjection + val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName) + (0 until dataFields.size()).foreach { index => + projection.addColumn(dataFields.get(index).getColName) + } + CarbonTableInputFormat.setColumnProjection(hadoopConf, projection) + val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val format = new CarbonTableInputFormat[Array[Object]]() + val model = format.getQueryModel(inputSplit, attemptContext) + val inputFormat = new CarbonStreamInputFormat + val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) + .asInstanceOf[CarbonStreamRecordReader] + streamReader.setVectorReader(false) + streamReader.setQueryModel(model) + streamReader.setUseRawRow(true) + streamReader.initialize(inputSplit, attemptContext) + val iteratorList = new util.ArrayList[RawResultIterator](1) + iteratorList.add(new StreamingRawResultIterator(streamReader)) + iteratorList + } + + private def prepareHandoffProcessor( + carbonTable: CarbonTable + ): CompactionResultSortProcessor = { + val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList( + carbonTable.getDimensionByTableName(carbonTable.getTableName), + carbonTable.getMeasureByTableName(carbonTable.getTableName)) + val dimLensWithComplex = new Array[Int](wrapperColumnSchemaList.size()) + for (i <- 0 until dimLensWithComplex.length) { --- End diff -- use dimLensWithComplex.length.zipWithIndex --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153094868 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.text.SimpleDateFormat +import java.util +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow + +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} +import org.apache.carbondata.spark.HandoffResult +import org.apache.carbondata.spark.rdd.CarbonRDD + +/** + * partition of the handoff segment + */ +class HandoffPartition( + val rddId: Int, + val idx: Int, + @transient val inputSplit: CarbonInputSplit +) extends Partition { + + val split = new SerializableWritable[CarbonInputSplit](inputSplit) + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * package the record reader of the handoff segment to RawResultIterator + */ +class StreamingRawResultIterator( + recordReader: CarbonStreamRecordReader +) extends RawResultIterator { + + override def hasNext: Boolean = { + recordReader.nextKeyValue() + } + + override def next(): Array[Object] = { + recordReader + .getCurrentValue + .asInstanceOf[GenericInternalRow] + .values + .asInstanceOf[Array[Object]] + } +} + +/** + * execute streaming segment handoff + */ +class StreamHandoffRDD[K, V]( + sc: SparkContext, + result: HandoffResult[K, V], + carbonLoadModel: CarbonLoadModel, + handOffSegmentId: String +) extends CarbonRDD[(K, V)](sc, Nil) { + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + override def internalCompute(split: Partition, + context: TaskContext): Iterator[(K, V)] = { + carbonLoadModel.setPartitionId("0") + carbonLoadModel.setTaskNo("" + split.index) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val iteratorList = prepareInputIterator(split, carbonTable) --- End diff -- add comment to describe it is using raw row --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153094993 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -196,9 +196,16 @@ private void deleteTempStoreLocation() { */ private void processResult(List<RawResultIterator> resultIteratorList) throws Exception { for (RawResultIterator resultIterator : resultIteratorList) { - while (resultIterator.hasNext()) { - addRowForSorting(prepareRowObjectForSorting(resultIterator.next())); - isRecordFound = true; + if (CompactionType.STREAMING == compactionType) { + while (resultIterator.hasNext()) { + addRowForSorting(resultIterator.next()); --- End diff -- add comment to describe it is using raw row --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1566 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1474/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1566 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1893/ --- |
Free forum by Nabble | Edit this page |