Posted by
GitBox on
Dec 29, 2020; 9:20am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-Indhumathi27-opened-a-new-pull-request-4067-WIP-Fix-concurrency-Load-Compaction-isI-tp105150p105175.html
ajantha-bhat commented on a change in pull request #4067:
URL:
https://github.com/apache/carbondata/pull/4067#discussion_r549630875##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
}
}
}
- // check for the skipped segments. compare the main table and SI table table
- // status file and get the skipped segments if any
- CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
- .foreach(metadataDetail => {
- if (repairLimit > failedLoadMetadataDetails.size()) {
- val detail = siTblLoadMetadataDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- val mainTableDetail = mainTableDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- if (null == detail || detail.length == 0) {
- val newDetails = new LoadMetadataDetails
- newDetails.setLoadName(metadataDetail)
- LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
- " table " + indexTableName + "." + carbonTable.getTableName)
- failedLoadMetadataDetails.add(newDetails)
- } else if (detail != null && detail.length != 0 && metadataDetail != null
- && metadataDetail.length != 0) {
- // If SI table has compacted segments and main table does not have
- // compacted segments due to some failure while compaction, need to
- // reload the original segments in this case.
- if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
- mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
- detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- // in concurrent scenario, if a compaction is going on table, then SI
- // segments are updated first in table status and then the main table
- // segment, so in any load runs parallel this listener shouldn't consider
- // those segments accidentally. So try to take the segment lock.
- val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
- LockUsage.LOCK)
- if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
- segmentLocks += segmentLockOfProbableOnCompactionSeg
- LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
- + "table " + indexTableName + "." + carbonTable.getTableName)
- failedLoadMetadataDetails.add(detail(0))
+ val carbonLock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
+ try {
+ // In some cases, SI table segment might be in COMPACTED state and main table
+ // compaction might be still in progress. In those cases, we can try to take compaction lock
+ // on main table and then compare and add SI segments to failedLoads, to avoid repair
+ // SI SUCCESS loads.
+ if (carbonLock.lockWithRetries(3, 0)) {
+ val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
Review comment:
if possible, clean up those commented readLoadMetadetails function calls in the same method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]