Indhumathi27 opened a new pull request #4067: URL: https://github.com/apache/carbondata/pull/4067 ### Why is this PR needed? ### What changes were proposed in this PR? ### Does this PR introduce any user interface change? - No - Yes. (please explain the change and update document) ### Is any new testcase added? - No - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751816234 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5251/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751821307 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3490/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940488 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5252/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940591 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3491/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751965233 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5254/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751966472 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3493/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996416 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3496/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996475 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5257/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549618492 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { Review comment: why not use default carbonLock.lockWithRetries() ? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { + val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable + .getMetadataPath) + // check for the skipped segments. compare the main table and SI table table + // status file and get the skipped segments if any + CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala + .foreach(metadataDetail => { + if (repairLimit > failedLoadMetadataDetails.size()) { + val detail = siTblLoadMetadataDetails + .filter(metadata => metadata.getLoadName.equals(metadataDetail)) + val mainTableDetail = newMainTableDetails + .filter(metadata => metadata.getLoadName.equals(metadataDetail)) + if (null == detail || detail.length == 0) { + val newDetails = new LoadMetadataDetails + newDetails.setLoadName(metadataDetail) + LOGGER.error( + "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + + " table " + indexTableName + "." + carbonTable.getTableName) + failedLoadMetadataDetails.add(newDetails) + } else if (detail != null && detail.length != 0 && metadataDetail != null + && metadataDetail.length != 0) { + // If SI table has compacted segments and main table does not have + // compacted segments due to some failure while compaction, need to + // reload the original segments in this case. + if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && + mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { + detail(0).setSegmentStatus(SegmentStatus.SUCCESS) + // in concurrent scenario, if a compaction is going on table, then SI + // segments are updated first in table status and then the main table + // segment, so in any load runs parallel this listener shouldn't consider + // those segments accidentally. So try to take the segment lock. + val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + + LockUsage.LOCK) + if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { + segmentLocks += segmentLockOfProbableOnCompactionSeg + LOGGER.error( + "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " + + "table " + indexTableName + "." + carbonTable.getTableName) + failedLoadMetadataDetails.add(detail(0)) + } + } } } - } - } - }) + }) + } else { + LOGGER.error( + "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName) Review comment: After your change, compaction happening in one thread and concurrently load or repair SI is called, repair cannot be successful as you don't consider main table segments as it is under compaction. so add a log here that "Didn't check failed segments for SI as main table is under compaction, please call SI repair again" ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { + val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable Review comment: I can see for repair itself, 3 times main table status and 2 times SI table status is read. please optimize. Also when we compare main tablestatus and SI tablestatus, it is good to acquire lock for both main and SI table status. Else if maintable status is read and some other thread can modify SI table status and main table status, which can cause wrong result in comparison. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( Review comment: please rename to compactionLock, so while reading its usage code down, it will be easy to know what lock it is ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549630875 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { + val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable Review comment: if possible, clean up those commented readLoadMetadetails function calls in the same method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549669980 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670016 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { + val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable + .getMetadataPath) + // check for the skipped segments. compare the main table and SI table table + // status file and get the skipped segments if any + CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala + .foreach(metadataDetail => { + if (repairLimit > failedLoadMetadataDetails.size()) { + val detail = siTblLoadMetadataDetails + .filter(metadata => metadata.getLoadName.equals(metadataDetail)) + val mainTableDetail = newMainTableDetails + .filter(metadata => metadata.getLoadName.equals(metadataDetail)) + if (null == detail || detail.length == 0) { + val newDetails = new LoadMetadataDetails + newDetails.setLoadName(metadataDetail) + LOGGER.error( + "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + + " table " + indexTableName + "." + carbonTable.getTableName) + failedLoadMetadataDetails.add(newDetails) + } else if (detail != null && detail.length != 0 && metadataDetail != null + && metadataDetail.length != 0) { + // If SI table has compacted segments and main table does not have + // compacted segments due to some failure while compaction, need to + // reload the original segments in this case. + if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && + mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { + detail(0).setSegmentStatus(SegmentStatus.SUCCESS) + // in concurrent scenario, if a compaction is going on table, then SI + // segments are updated first in table status and then the main table + // segment, so in any load runs parallel this listener shouldn't consider + // those segments accidentally. So try to take the segment lock. + val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + + LockUsage.LOCK) + if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { + segmentLocks += segmentLockOfProbableOnCompactionSeg + LOGGER.error( + "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " + + "table " + indexTableName + "." + carbonTable.getTableName) + failedLoadMetadataDetails.add(detail(0)) + } + } } } - } - } - }) + }) + } else { + LOGGER.error( + "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670126 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -462,47 +462,67 @@ object CarbonIndexUtil { } } } - // check for the skipped segments. compare the main table and SI table table - // status file and get the skipped segments if any - CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala - .foreach(metadataDetail => { - if (repairLimit > failedLoadMetadataDetails.size()) { - val detail = siTblLoadMetadataDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - val mainTableDetail = mainTableDetails - .filter(metadata => metadata.getLoadName.equals(metadataDetail)) - if (null == detail || detail.length == 0) { - val newDetails = new LoadMetadataDetails - newDetails.setLoadName(metadataDetail) - LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" + - " table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(newDetails) - } else if (detail != null && detail.length != 0 && metadataDetail != null - && metadataDetail.length != 0) { - // If SI table has compacted segments and main table does not have - // compacted segments due to some failure while compaction, need to - // reload the original segments in this case. - if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED && - mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) { - detail(0).setSegmentStatus(SegmentStatus.SUCCESS) - // in concurrent scenario, if a compaction is going on table, then SI - // segments are updated first in table status and then the main table - // segment, so in any load runs parallel this listener shouldn't consider - // those segments accidentally. So try to take the segment lock. - val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) + - LockUsage.LOCK) - if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) { - segmentLocks += segmentLockOfProbableOnCompactionSeg - LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " - + "table " + indexTableName + "." + carbonTable.getTableName) - failedLoadMetadataDetails.add(detail(0)) + val carbonLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (carbonLock.lockWithRetries(3, 0)) { + val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable Review comment: modified ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044813 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3497/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044968 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5258/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752069900 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3498/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752070020 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5259/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549715981 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -406,158 +407,195 @@ object CarbonIndexUtil { .asInstanceOf[CarbonRelation] .carbonTable - val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = - SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty - if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( - mainTableDetails.toArray, - siTblLoadMetadataDetails)) { - val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider, - indexTableName) - val indexModel = IndexModel(Some(carbonTable.getDatabaseName), - indexMetadata.getParentTableName, - indexColumns.split(",").toList, - indexTableName) - - // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) - // If it empty, then no need to do further computations because the - // tabletstatus might not have been created and hence next load will take care - if (siTblLoadMetadataDetails.isEmpty) { - Seq.empty - } + val compactionLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (compactionLock.lockWithRetries()) { + var mainTableDetails = try { + SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath( + carbonTable.getTablePath)) + } catch { + case _: Exception => + if (!isLoadOrCompaction) { + throw new ConcurrentOperationException(carbonTable.getDatabaseName, Review comment: I dont think this has to be concurrent exception, it can throw only IO exception, in that case just throw the same exception. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4067: URL: https://github.com/apache/carbondata/pull/4067#discussion_r549716227 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala ########## @@ -406,158 +407,195 @@ object CarbonIndexUtil { .asInstanceOf[CarbonRelation] .carbonTable - val siTblLoadMetadataDetails: Array[LoadMetadataDetails] = - SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty - if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg( - mainTableDetails.toArray, - siTblLoadMetadataDetails)) { - val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider, - indexTableName) - val indexModel = IndexModel(Some(carbonTable.getDatabaseName), - indexMetadata.getParentTableName, - indexColumns.split(",").toList, - indexTableName) - - // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath) - // If it empty, then no need to do further computations because the - // tabletstatus might not have been created and hence next load will take care - if (siTblLoadMetadataDetails.isEmpty) { - Seq.empty - } + val compactionLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) + try { + // In some cases, SI table segment might be in COMPACTED state and main table + // compaction might be still in progress. In those cases, we can try to take compaction lock + // on main table and then compare and add SI segments to failedLoads, to avoid repair + // SI SUCCESS loads. + if (compactionLock.lockWithRetries()) { + var mainTableDetails = try { + SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath( + carbonTable.getTablePath)) + } catch { + case _: Exception => + if (!isLoadOrCompaction) { + throw new ConcurrentOperationException(carbonTable.getDatabaseName, + carbonTable.getTableName, "table status read", "reindex command") + } + return; Review comment: else case better to add a error log ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |