Login  Register

[GitHub] [carbondata] kunal642 commented on a change in pull request #4070: [CARBONDATA-4082] Fix alter table add segment query on adding a segment having delete delta files.

Posted by GitBox on Feb 02, 2021; 11:33am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Karan980-opened-a-new-pull-request-4070-CARBONDATA-4082-Fix-alter-table-add-segmen-tp105251p105927.html


kunal642 commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r568531096



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -369,5 +426,64 @@ case class CarbonAddLoadCommand(
     }
   }
 
+  /**
+   * If there are more than one deleteDelta File present  for a block. Then This method
+   * will pick the deltaFile with highest timestamp, because the default threshold for horizontal
+   * compaction is 1. It is assumed that threshold for horizontal compaction is not changed from
+   * default value. So there will always be only one valid delete delta file present for a block.
+   * It also sets the number of deleted rows for a segment.
+   */
+  def setValidDeltaFileAndDeletedRowCount(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails
+      ) : Unit = {
+    var maxDeltaStamp : Long = -1
+    var deletedRowsCount : Long = 0
+    var validDeltaFile : CarbonFile = null
+    deleteDeltaFiles.foreach { deltaFile =>
+      val currentFileTimestamp = CarbonTablePath.DataFileUtil
+        .getTimeStampFromDeleteDeltaFile(deltaFile._2)
+      if (currentFileTimestamp.toLong > maxDeltaStamp) {
+        maxDeltaStamp = currentFileTimestamp.toLong
+        validDeltaFile = deltaFile._1
+      }
+    }
+    val blockDetails =
+      new CarbonDeleteDeltaFileReaderImpl(validDeltaFile.getAbsolutePath).readJson()
+    blockDetails.getBlockletDetails.asScala.foreach { blocklet =>
+      deletedRowsCount = deletedRowsCount + blocklet.getDeletedRows.size()
+    }
+    segmentUpdateDetails.setDeleteDeltaStartTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeleteDeltaEndTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeletedRowsInBlock(deletedRowsCount.toString)
+  }
+
+  /**
+   * As horizontal compaction not supported for SDK segments. So all delta files are valid
+   */
+  def readAllDeltaFiles(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails
+  ) : Unit = {

Review comment:
       please fix this formatting.. move to above line. Check other code for the same as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]