[GitHub] [carbondata] Indhumathi27 opened a new pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 opened a new pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox

Indhumathi27 opened a new pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067


    ### Why is this PR needed?
   
   
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751816234


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5251/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751821307


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3490/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940488


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5252/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940591


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3491/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751965233


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5254/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751966472


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3493/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996416


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3496/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996475


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5257/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549618492



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       why not use default carbonLock.lockWithRetries() ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName)

Review comment:
       After your change, compaction happening in one thread and concurrently load or repair SI is called, repair cannot be successful as you don't consider main table segments as it is under compaction.
   
   so add a log here that "Didn't check failed segments for SI as main table is under compaction, please call SI repair again"

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       I can see for repair itself, 3 times main table status and 2 times SI table status is read.  please optimize.
   
   Also when we compare main tablestatus and SI tablestatus, it is good to acquire lock for both main and SI table status. Else if maintable status is read and some other thread can modify SI table status and main table status, which can cause wrong result in comparison.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(

Review comment:
       please rename to compactionLock, so while reading its usage code down, it will be easy to know what lock it is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549630875



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       if possible, clean up those commented readLoadMetadetails function calls in the same method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549669980



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670016



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName)

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670126



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       modified




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044813


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3497/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044968


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5258/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752069900


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3498/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752070020


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5259/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549715981



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,

Review comment:
       I dont think this has to be concurrent exception, it can throw only IO exception, in that case just throw the same exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549716227



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+                carbonTable.getTableName, "table status read", "reindex command")
+            }
+            return;

Review comment:
       else case better to add a error log




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12