GitHub user xuchuanyin opened a pull request:
https://github.com/apache/carbondata/pull/1261 [CARBONDATA-1373] Enhance update performance by increasing parallelism # Scenario Recently I have tested the update feature provided in Carbondata and found its poor performance. I had a table containing about 14 million records with about 370 columns(no dictionary columns) and the data files are about 3.8 GB in total. All the data files were in one segment. I performed an update SQL which update a column for all the records and the SQL looked like `UPDATE myTable SET (col1)=(col1+1000) WHERE TRUE`. In my environment, the update job failed with 'executor lost errors'. And I found 'spill data' related messages in the container logs. # Analyze I've read about the implementation of update-delete in Carbondata in ISSUE#440. The update consists a delete and an insert operation. And the error occurred during the insert operation. After studying the code, I have found that while doing inserting, the updated records are grouped by the `segmentId`, which means all the recoreds in one segment will be processed in only one task, thus will cause task failure when the amount of input data is quite large. # Solution We should improve the parallelism when doing update for a segment. I append a random key to the `segmentId` to increase the partition number before doing the insertion stage and then remove the suffix when doing the real insertion. # Modification + Increase parallelism while processing one segment in update, this is achieved by distributing records to different partitions by using a customized partitioner. + Add a property to configure the parallelism. + Clean up local files after update (previous bugs) + Remove useless imports + Add tests + Add related documents # Notes I have tested in my example and the job finished in about 13 minutes successfully. The records were updated as expected. Comparing to the previous implementation, the update performance has been enhanced: Origin(Parallelism(1) + GroupBy): Update **FAILED** Adding Parallelism(6) + GroupBy: Update **SUCCESSFULLY** using **13mins** Parallelism(1) + PartitionBy: Update **SUCCESSFULLY** using **21mins** Adding Parallelism(6) + PartitionBy: Update **SUCCESSFULLY** using **5mins** You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata enhance_update_perf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1261.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1261 ---- commit ebfe1ca2a125c0b736e917d8a7956f5e39dedc50 Author: xuchuanyin <[hidden email]> Date: 2017-08-11T15:00:20Z Enhance update performance by increasing parallelism + Increase parallelism while processing one segment in update + Use partitionBy instead of groupby + Add a property to configure the parallelism + Clean up local files after update (previous bugs) + Remove useless imports ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 Previous PR #1253 is closed, use this new PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Failed with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/195/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Success with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/205/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Failed with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/207/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Success with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/210/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 Test failed for `DataMapWriterSuite.scala:123` with message: List("blocklet start 0", "add page data: blocklet 0, page 0", "add page data: blocklet 0, page 1", "add page data: blocklet 0, page 2", "add page data: blocklet 0, page 3", "add page data: blocklet 0, page 4", "add page data: blocklet 0, page 5", "add page data: blocklet 0, page 6", "add page data: blocklet 0, page 7", "add page data: blocklet 0, page 8", "add page data: blocklet 0, page 9", "blocklet end: 0") did not equal List("blocklet start 0", "add page data: blocklet 0, page 0", "add page data: blocklet 0, page 1", "add page data: blocklet 0, page 2", "add page data: blocklet 0, page 3", "add page data: blocklet 0, page 4", "add page data: blocklet 0, page 5", "add page data: blocklet 0, page 6", "add page data: blocklet 0, page 7", "blocklet end: 0", "blocklet start 1", "add page data: blocklet 1, page 0", "add page data: blocklet 1, page 1", "blocklet end: 1") which expected 2 blockets with 8 pages in total, and the actual result is 1 blocket with 8 pages in total. Is this an occasional or real problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 retest this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 @sraghunandan @jackylk pls review this PR. Besides, please pay attention to the previous comment https://github.com/apache/carbondata/pull/1261#issuecomment-323000513, **there may be some errors in tests**. I just retest the PR without modifying the code, the test are passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Success with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/213/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1261#discussion_r133689326 --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -657,26 +652,53 @@ object CarbonDataRDDFactory { val updateRdd = dataFrame.get.rdd + // return directly if no rows to update + val noRowsToUpdate = updateRdd.isEmpty() + if (noRowsToUpdate) { + res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() + return + } + // splitting as (key, value) i.e., (segment, updatedRows) val keyRDD = updateRdd.map(row => - // splitting as (key, value) i.e., (segment, updatedRows) - (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)) - ) - val groupBySegmentRdd = keyRDD.groupByKey() + (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) + + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetaDataFilepath) + val segmentIds = loadMetadataDetails.map(_.getLoadName) + val segmentIdIndex = segmentIds.zipWithIndex.toMap + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, + carbonTable.getCarbonTableIdentifier) + val segmentId2maxTaskNo = segmentIds + .map(segId => + (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))) + .toMap + + class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) + extends org.apache.spark.Partitioner { + override def numPartitions: Int = segmentIdIndex.size * parallelism + + override def getPartition(key: Any): Int = { + val segId = key.asInstanceOf[String] + // partitionId + segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) + } + } - val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p => - DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host) - }.distinct.size - val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, - sqlContext.sparkContext) - val groupBySegmentAndNodeRdd = - new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd, - nodes.distinct.toArray) + val partitionByRdd = keyRDD + .partitionBy(new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism)) - res = groupBySegmentAndNodeRdd.map(x => - triggerDataLoadForSegment(x._1, x._2.toIterator).toList - ).collect() + // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, + // so segmentIdIndex=partitionId/parallelism, this has been verified. + res = partitionByRdd.map(_._2).mapPartitions(p => { --- End diff -- change `.mapPartitions(p => {` to `.mapPartitions{ partition =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1261#discussion_r133689638 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -756,26 +751,53 @@ object CarbonDataRDDFactory { val updateRdd = dataFrame.get.rdd + // return directly if no rows to update + val noRowsToUpdate = updateRdd.isEmpty() + if (noRowsToUpdate) { + res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() + return + } + // splitting as (key, value) i.e., (segment, updatedRows) val keyRDD = updateRdd.map(row => - // splitting as (key, value) i.e., (segment, updatedRows) - (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)) - ) - val groupBySegmentRdd = keyRDD.groupByKey() + (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) + + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetaDataFilepath) + val segmentIds = loadMetadataDetails.map(_.getLoadName) + val segmentIdIndex = segmentIds.zipWithIndex.toMap + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, + carbonTable.getCarbonTableIdentifier) + val segmentId2maxTaskNo = segmentIds + .map(segId => --- End diff -- move to previous line, and change style to `.map{ segId =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user xuchuanyin commented on the issue:
https://github.com/apache/carbondata/pull/1261 @jackylk thks, reviews are fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Success with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/222/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1261 SDV Build Success with Spark 2.1, Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/224/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1261 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/1261 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |