GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1675 Partition compaction This PR depends on https://github.com/apache/carbondata/pull/1654 and https://github.com/apache/carbondata/pull/1672 and https://github.com/apache/carbondata/pull/1674 It supports compaction on partition table. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [X] Any interfaces changed? - [X] Any backward compatibility impacted? NO - [X] Document update required? Yes - [X] Testing done Tests added - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata partition-compaction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1675 ---- commit 6a02bfe53117ddae5494e60b112b406841611bc3 Author: ravipesala <[hidden email]> Date: 2017-12-04T10:37:03Z Added outputformat for carbon commit 3e8b38a99d519147410fe8a9a37be1692424ad64 Author: ravipesala <[hidden email]> Date: 2017-12-12T06:12:45Z Added fileformat in carbon commit df195346df12ee1af952d7a5e35ab79a012d346c Author: ravipesala <[hidden email]> Date: 2017-12-15T19:18:19Z Added support to query using standard partitions commit d357b6d435c4c7f48a8b367c7432b8975a426cd1 Author: ravipesala <[hidden email]> Date: 2017-12-16T17:08:00Z Added drop partition feature commit ca467e06e5d6a30f5fe5e613c209494085788445 Author: ravipesala <[hidden email]> Date: 2017-12-17T05:39:34Z Support compaction for partition tables ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/850/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2075/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1675 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2376/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/916/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1675 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2434/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2148/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157951533 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -428,7 +438,11 @@ class CarbonMergerRDD[K, V]( carbonPartitionId = Integer.parseInt(taskInfo.getTaskId) } result.add( - new CarbonSparkPartition(id, taskPartitionNo, multiBlockSplit, carbonPartitionId)) + new CarbonSparkPartition(id, --- End diff -- move id to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157951789 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -456,6 +470,33 @@ class CarbonMergerRDD[K, V]( result.toArray(new Array[Partition](result.size)) } + private def getTaskNo( + split: CarbonInputSplit, + partitionTaskMap: util.Map[List[String], String]): String = { + if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isStandardPartitionTable) { --- End diff -- I remember their is a comment to rename `isStandardPartitionTable` to `isHivePartitionTable` in earlier PR, it was not fixed? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952029 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -722,8 +723,8 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, --- End diff -- move compactionSize to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952050 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -211,8 +212,8 @@ object CarbonDataRDDFactory { val newcompactionModel = CompactionModel(compactionSize, --- End diff -- move compactionSize to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952111 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala --- @@ -130,6 +136,24 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val validSegments: Array[String] = CarbonDataMergerUtil .getValidSegments(loadsToMerge).split(',') val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime() + val partitionMapper = if (carbonTable.isStandardPartitionTable) { + var partitionMap: util.Map[String, util.List[String]] = null + validSegments.foreach { segmentId => + val localMapper = new PartitionFileStore() + localMapper.readAllPartitionsOfSegment( + CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId)) + if (partitionMap == null) { + partitionMap = localMapper.getPartitionMap + } else { + partitionMap.putAll(localMapper.getPartitionMap) + } + } + val mapper = new PartitionMapper() + mapper.setPartitionMap(partitionMap) + mapper + } else { + null + } val carbonMergerMapping = CarbonMergerMapping(tablePath, --- End diff -- move tablePath to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952318 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -168,6 +174,18 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { } catch (Exception e) { LOGGER.error(e, "Compaction failed: " + e.getMessage()); } finally { + if (partitionNames != null) { + try { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath( + carbonLoadModel.getTablePath(), + carbonLoadModel.getSegmentId()), + carbonLoadModel.getTaskNo(), partitionNames); --- End diff -- move partitionNames to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952357 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -150,7 +157,12 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { if (isDataPresent) { this.dataHandler.closeHandler(); } - } catch (CarbonDataWriterException e) { + if (partitionNames != null) { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()), + loadModel.getTaskNo(), partitionNames); --- End diff -- move partitionNames to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157952439 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -135,13 +139,15 @@ * @param tableName --- End diff -- move comment for all these parameters --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157953727 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -57,8 +62,10 @@ public RowResultMergerProcessor(String databaseName, String tableName, SegmentProperties segProp, String[] tempStoreLocation, - CarbonLoadModel loadModel, CompactionType compactionType) { + CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) { this.segprop = segProp; + this.partitionNames = partitionNames; + this.loadModel = loadModel; --- End diff -- How about keeping the segment path in this class and use it in line 162 instead of keeping the whole load model in this class? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157955683 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -150,7 +157,12 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { if (isDataPresent) { this.dataHandler.closeHandler(); } - } catch (CarbonDataWriterException e) { + if (partitionNames != null) { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()), + loadModel.getTaskNo(), partitionNames); + } + } catch (CarbonDataWriterException | IOException e) { LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage()); --- End diff -- log message is not valid any more --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/936/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2165/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157961992 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -177,7 +177,8 @@ class StreamHandoffRDD[K, V]( carbonTable, segmentProperties, CompactionType.STREAMING, - carbonTable.getTableName + carbonTable.getTableName, + null --- End diff -- It is better to reject setting streaming property when creating partition table, I think query may have issue if we passing null when compacting the streaming table --- |
Free forum by Nabble | Edit this page |