Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157988939 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -428,7 +438,11 @@ class CarbonMergerRDD[K, V]( carbonPartitionId = Integer.parseInt(taskInfo.getTaskId) } result.add( - new CarbonSparkPartition(id, taskPartitionNo, multiBlockSplit, carbonPartitionId)) + new CarbonSparkPartition(id, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157989560 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala --- @@ -456,6 +470,33 @@ class CarbonMergerRDD[K, V]( result.toArray(new Array[Partition](result.size)) } + private def getTaskNo( + split: CarbonInputSplit, + partitionTaskMap: util.Map[List[String], String]): String = { + if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isStandardPartitionTable) { --- End diff -- yes, it is already changed but after rebase I did not fix compile issues. Now I fixed. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157989871 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -722,8 +723,8 @@ object CarbonDataRDDFactory { val compactionModel = CompactionModel(compactionSize, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157989949 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -211,8 +212,8 @@ object CarbonDataRDDFactory { val newcompactionModel = CompactionModel(compactionSize, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157990242 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala --- @@ -130,6 +136,24 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val validSegments: Array[String] = CarbonDataMergerUtil .getValidSegments(loadsToMerge).split(',') val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime() + val partitionMapper = if (carbonTable.isStandardPartitionTable) { + var partitionMap: util.Map[String, util.List[String]] = null + validSegments.foreach { segmentId => + val localMapper = new PartitionFileStore() + localMapper.readAllPartitionsOfSegment( + CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId)) + if (partitionMap == null) { + partitionMap = localMapper.getPartitionMap + } else { + partitionMap.putAll(localMapper.getPartitionMap) + } + } + val mapper = new PartitionMapper() + mapper.setPartitionMap(partitionMap) + mapper + } else { + null + } val carbonMergerMapping = CarbonMergerMapping(tablePath, --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157990502 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -168,6 +174,18 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { } catch (Exception e) { LOGGER.error(e, "Compaction failed: " + e.getMessage()); } finally { + if (partitionNames != null) { + try { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath( + carbonLoadModel.getTablePath(), + carbonLoadModel.getSegmentId()), + carbonLoadModel.getTaskNo(), partitionNames); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157990666 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -150,7 +157,12 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { if (isDataPresent) { this.dataHandler.closeHandler(); } - } catch (CarbonDataWriterException e) { + if (partitionNames != null) { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()), + loadModel.getTaskNo(), partitionNames); --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157990987 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java --- @@ -135,13 +139,15 @@ * @param tableName --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157992006 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -57,8 +62,10 @@ public RowResultMergerProcessor(String databaseName, String tableName, SegmentProperties segProp, String[] tempStoreLocation, - CarbonLoadModel loadModel, CompactionType compactionType) { + CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) { this.segprop = segProp; + this.partitionNames = partitionNames; + this.loadModel = loadModel; --- End diff -- But it needs task number as well, thats why loadModel kept at class level instead of keeping two parameters --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157992425 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java --- @@ -150,7 +157,12 @@ public boolean execute(List<RawResultIterator> resultIteratorList) { if (isDataPresent) { this.dataHandler.closeHandler(); } - } catch (CarbonDataWriterException e) { + if (partitionNames != null) { + new PartitionFileStore().writePartitionMapFile( + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()), + loadModel.getTaskNo(), partitionNames); + } + } catch (CarbonDataWriterException | IOException e) { LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage()); --- End diff -- ok, updated --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1675#discussion_r157992680 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -177,7 +177,8 @@ class StreamHandoffRDD[K, V]( carbonTable, segmentProperties, CompactionType.STREAMING, - carbonTable.getTableName + carbonTable.getTableName, + null --- End diff -- Ok, will add the restrictions in another pr --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2192/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1675 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/969/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1675 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2464/ --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
|
Free forum by Nabble | Edit this page |