Login  Register

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox on Dec 29, 2020; 11:28am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Indhumathi27-opened-a-new-pull-request-4067-WIP-Fix-concurrency-Load-Compaction-isI-tp105150p105178.html


Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670016



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName)

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]