[GitHub] [carbondata] jackylk commented on a change in pull request #3475: [CARBONDATA-3594] Optimize getSplits() during compaction

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3475: [CARBONDATA-3594] Optimize getSplits() during compaction

GitBox
jackylk commented on a change in pull request #3475: [CARBONDATA-3594] Optimize getSplits() during compaction
URL: https://github.com/apache/carbondata/pull/3475#discussion_r349930173
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 ##########
 @@ -369,44 +371,48 @@ class CarbonMergerRDD[K, V](
           }
         }
       }
+      validSegIds.add(eachSeg.getSegmentNo)
+    }
 
-      // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
-
+    // map for keeping the relation of a task and its blocks.
+    job.getConfiguration
+      .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(","))
+
+    val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
+    // get splits
+    val splits = format.getSplits(job)
+
+    // keep on assigning till last one is reached.
+    if (null != splits && splits.size > 0) {
+      splitsOfLastSegment = splits.asScala
+        .map(_.asInstanceOf[CarbonInputSplit])
+        .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
+    }
+    val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
+      val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
+      val blockInfo = new TableBlockInfo(entry.getFilePath,
+        entry.getStart, entry.getSegmentId,
+        entry.getLocations, entry.getLength, entry.getVersion,
+        updateStatusManager.getDeleteDeltaFilePath(
+          entry.getFilePath,
+          segmentId)
+      )
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
-         updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
-      }
-
-      val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
-      // get splits
-      val splits = format.getSplits(job)
-
-      // keep on assigning till last one is reached.
-      if (null != splits && splits.size > 0) {
-        splitsOfLastSegment = splits.asScala
-          .map(_.asInstanceOf[CarbonInputSplit])
-          .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
-      }
-       val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
-        val blockInfo = new TableBlockInfo(entry.getFilePath,
-          entry.getStart, entry.getSegmentId,
-          entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(
-            entry.getFilePath,
-            Segment.toSegment(entry.getSegmentId).getSegmentNo)
-        )
-        (!updated || (updated && (!CarbonUtil
-          .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
-            updateDetails, updateStatusManager)))) &&
-        FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+        updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
       }
-      if (rangeColumn != null) {
-        totalTaskCount = totalTaskCount +
-                         CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
-      }
-      carbonInputSplits ++:= filteredSplits
-      allSplits.addAll(filteredSplits.asJava)
+      // filter splits with V3 data file format
+      // if split is updated, then check for if it is valid segment based on update details
+      (!updated ||
+       (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+         updateDetails, updateStatusManager)))) &&
+      FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+    }
+    if (rangeColumn != null) {
+      totalTaskCount = totalTaskCount +
+                       CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
     }
+    carbonInputSplits ++:= filteredSplits
 
 Review comment:
   Please do not use `++:=` it is creating new list, you can use `++=` instead

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services