ajantha-bhat opened a new pull request #3871: URL: https://github.com/apache/carbondata/pull/3871 ### Why is this PR needed? Auto compaction/minor compaction was happening multiple times for same segments. Executor (for merge index and merge data files) segmentfile write failure and table status update failure is not handled. when compaction failed, no need to call merge index Segment file not cleaned up when table status update failed for compaction Some table status retry issues ### What changes were proposed in this PR? ### Does this PR introduce any user interface change? - No - Yes. (please explain the change and update document) ### Is any new testcase added? - No - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-665517022 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667921823 @ajantha-bhat please resolve conflicts ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r464307098 ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -234,13 +235,15 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); + mergeIndexFiles.add(entry.getKey() + "/" + mergeIndexFile); segment.getValue().setFiles(new HashSet<String>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { + // TODO: handle failure here Review comment: is TODO needs to be handled in this PR? please handle if yes ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -234,13 +235,15 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); + mergeIndexFiles.add(entry.getKey() + "/" + mergeIndexFile); Review comment: use `FILE_SEPARATOR` constant ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ########## @@ -373,11 +381,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } else { true } - // here either of the conditions can be true, when delete segment is fired after compaction - // has started, statusFileUpdation will be false , but at the same time commitComplete can be Review comment: here may be instead of deleting the complete comment, you can rewrite the comment required for `commitComplete` ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -357,7 +357,6 @@ object SecondaryIndexCreator { if (null != executorService) { executorService.shutdownNow() } - Review comment: revert this change ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -54,39 +59,77 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { String metaDataLocation = carbonTable.getMetadataPath(); - //delete folder which metadata no exist in tablestatus - String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); - if (FileFactory.isFileExist(partitionPath)) { - final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + //delete folder which metadata no exist in tablestatus + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + if (FileFactory.isFileExist(partitionPath)) { + final LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(metaDataLocation); + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; + } + } + return !found; + } + }); + CarbonFile[] allSegmentFiles = null; + if (isCompactionFlow) { + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles(); + } + for (int k = 0; k < listFiles.length; k++) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + // if segment file exist delete it. + for (CarbonFile segment : allSegmentFiles) { + if (segment.getPath().contains(segmentId)) { + segment.delete(); + break; + } + } + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } - return !found; } - }); - for (int k = 0; k < listFiles.length; k++) { - String segmentId = CarbonTablePath.DataFileUtil - .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + } else { if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } + LOGGER.error( + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"); + throw new RuntimeException( Review comment: Do we really need to throw exception here? because since it's clean up, we can just give an error message and retry next time instead of failing the operation, what do you think? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java ########## @@ -173,11 +180,15 @@ public static boolean recordLoadMetadata(List<LoadMetadataDetails> newLoadMetada LOGGER.error( "Not able to acquire the lock for Table status updation for table " + databaseName + "." + tableName); + throw new RuntimeException( + "Not able to acquire the lock for Table status updation for table " + databaseName + "." + + tableName); } } catch (IOException e) { LOGGER.error( "Not able to acquire the lock for Table status updation for table " + databaseName + "." + tableName); + Review comment: revert this change ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala ########## @@ -235,18 +235,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand( throw new ErrorMessage(s"Number of columns in Index table cannot be more than " + "number of key columns in Source table") } - - var isColsIndexedAsPerTable = true Review comment: can you please tell me why this change required? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667926580 @ajantha-bhat please create a JIRA and update the PR description ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667927752 @akashrn5 : This is WIP PR, please wait ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691855754 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4057/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691858887 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2319/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691996188 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2327/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r487850404 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ########## @@ -373,11 +381,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } else { true } - // here either of the conditions can be true, when delete segment is fired after compaction - // has started, statusFileUpdation will be false , but at the same time commitComplete can be Review comment: the existing comment in the code was that, why we need to consider both the flag. Now only one flag is there. so, it is understandable. Based on new code if it is not understandable and if you have any suggestion for comment, you can give here. I will add. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r487851114 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -54,39 +59,77 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { String metaDataLocation = carbonTable.getMetadataPath(); - //delete folder which metadata no exist in tablestatus - String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); - if (FileFactory.isFileExist(partitionPath)) { - final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + //delete folder which metadata no exist in tablestatus + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + if (FileFactory.isFileExist(partitionPath)) { + final LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(metaDataLocation); + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; + } + } + return !found; + } + }); + CarbonFile[] allSegmentFiles = null; + if (isCompactionFlow) { + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles(); + } + for (int k = 0; k < listFiles.length; k++) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + // if segment file exist delete it. + for (CarbonFile segment : allSegmentFiles) { + if (segment.getPath().contains(segmentId)) { + segment.delete(); + break; + } + } + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } - return !found; } - }); - for (int k = 0; k < listFiles.length; k++) { - String segmentId = CarbonTablePath.DataFileUtil - .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + } else { if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } + LOGGER.error( + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"); + throw new RuntimeException( Review comment: yes, for compaction flow I faced the issue. If the stale segments not cleared. there will be duplicate data after compaction. so, I cannot go ahead for compaction if the stale data is not cleaned. so, throwing the exception here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r487851114 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -54,39 +59,77 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { String metaDataLocation = carbonTable.getMetadataPath(); - //delete folder which metadata no exist in tablestatus - String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); - if (FileFactory.isFileExist(partitionPath)) { - final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + //delete folder which metadata no exist in tablestatus + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + if (FileFactory.isFileExist(partitionPath)) { + final LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(metaDataLocation); + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; + } + } + return !found; + } + }); + CarbonFile[] allSegmentFiles = null; + if (isCompactionFlow) { + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles(); + } + for (int k = 0; k < listFiles.length; k++) { + String segmentId = CarbonTablePath.DataFileUtil + .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + // if segment file exist delete it. + for (CarbonFile segment : allSegmentFiles) { + if (segment.getPath().contains(segmentId)) { + segment.delete(); + break; + } + } + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } - return !found; } - }); - for (int k = 0; k < listFiles.length; k++) { - String segmentId = CarbonTablePath.DataFileUtil - .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy"); + } else { if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } + LOGGER.error( + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"); + throw new RuntimeException( Review comment: yes, for compaction flow I faced the issue. If the stale segments not cleared. there will be duplicate data after compaction. so, I cannot go ahead for compaction if the stale data is not cleaned. so, throwing the exception here to retry the compaction. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003279 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4066/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003597 @akashrn5 , It was a WIP PR. Now I got some time and worked on it. PR is ready you can freshly review and merge. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat edited a comment on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003597 @akashrn5 , It was a WIP PR. Now I got some time and worked on it. Removed WIP now. PR is ready you can freshly review and merge. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r487993198 ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + } catch (Exception ex) { + // delete merge index file if created, + // keep only index files as segment file writing is failed Review comment: can you add the error log here just with the message, not the throwable ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; if (!table.isHivePartitionTable()) { - SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName, + String content = SegmentStatusManager.readFileAsString(path); + try { + SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); + } catch (Exception ex) { + // delete merge index file if created, + // keep only index files as segment file writing is failed + for (String file : mergeIndexFiles) { Review comment: ```suggestion for (String mergeIndexFile : mergeIndexFiles) { ``` ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata is not exists Review comment: ```suggestion // it should be original segment and segment metadata does not exists ``` ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); Review comment: i know its base code, but can you replace `_` with constant `UNDERSCORE` and name the boolean variable ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; if (!table.isHivePartitionTable()) { - SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName, + String content = SegmentStatusManager.readFileAsString(path); + try { + SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); + } catch (Exception ex) { + // delete merge index file if created, Review comment: can you add the error log here just with the message, not the throwable. It will help for any analysis in issues ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists Review comment: ```suggestion // in compaction flow, it should be big segment and segment metadata does not exists ``` ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata is not exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + } + } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // collect the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + CarbonFile[] allSegmentMetadataFiles = Review comment: instead of list files all, you can list files whole names starts with id present in `staleSegments` and directly delete those ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata is not exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + } + } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // collect the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + CarbonFile[] allSegmentMetadataFiles = + FileFactory.getCarbonFile(segmentFilesLocation).listFiles(); + // delete the segment metadata files also + for (CarbonFile segmentMetadataFile : allSegmentMetadataFiles) { + String segmentId = + segmentMetadataFile.getName().split(CarbonCommonConstants.UNDERSCORE)[0]; + if (staleSegmentsId.contains(segmentId)) { + segmentMetadataFile.delete(); } } } + } else { + LOGGER.error( + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"); Review comment: here the error message is same as below, like retry compaction. Please check and change it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r488444308 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata is not exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + } + } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // collect the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + CarbonFile[] allSegmentMetadataFiles = + FileFactory.getCarbonFile(segmentFilesLocation).listFiles(); + // delete the segment metadata files also + for (CarbonFile segmentMetadataFile : allSegmentMetadataFiles) { + String segmentId = + segmentMetadataFile.getName().split(CarbonCommonConstants.UNDERSCORE)[0]; + if (staleSegmentsId.contains(segmentId)) { + segmentMetadataFile.delete(); } } } + } else { + LOGGER.error( + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"); Review comment: assigned to a variable ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, it should be big segment and segment metadata is not exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata is not exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + } + } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // collect the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + CarbonFile[] allSegmentMetadataFiles = Review comment: ok ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; if (!table.isHivePartitionTable()) { - SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName, + String content = SegmentStatusManager.readFileAsString(path); + try { + SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); + } catch (Exception ex) { + // delete merge index file if created, Review comment: In both the callers, they are catching these exceptions and adding log. Same behavior. ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + } catch (Exception ex) { + // delete merge index file if created, + // keep only index files as segment file writing is failed Review comment: In both the callers, they are catching this exceptions and adding log. Same behavior. ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); Review comment: > name the boolean variable This is java. Handled only _ change. ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); Review comment: > name the boolean variable This is java. Handled only _ change. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r488503684 ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, } if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segment.getValue().setMergeFileName(mergeIndexFile); - segment.getValue().setFiles(new HashSet<String>()); + mergeIndexFiles + .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile); + segment.getValue().setFiles(new HashSet<>()); break; } } if (table.isHivePartitionTable()) { for (PartitionSpec partitionSpec : partitionSpecs) { if (partitionSpec.getLocation().toString().equals(partitionPath)) { - SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, - segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + try { + SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath, + segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true); + } catch (Exception ex) { + // delete merge index file if created, + // keep only index files as segment file writing is failed Review comment: what i meant to say is, here we can error as writing segment file failed, and then throw exception, because we just get the IO exception here and not any custom message exception, so if we add here, it will be easy for any future analysis. ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,85 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); - } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, + // it should be merged segment and segment metadata doesn't exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata doesn't exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } } } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // get the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + // delete the segment metadata files also + CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation) + .listFiles(file -> (staleSegmentsId + .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0]))); + for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) { + staleSegmentMetadataFile.delete(); + } + } + } else { + String errorMessage = + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"; + LOGGER.error(errorMessage); + if (isCompactionFlow) { Review comment: here its little bit confusing for me, the message is same, saying retry compaction, then why throw inside `isCompactionFlow` condition. If we want to throw inside the if condition, then the error message should be different when not compaction flow right? ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId, String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; if (!table.isHivePartitionTable()) { - SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName, + String content = SegmentStatusManager.readFileAsString(path); + try { + SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); + } catch (Exception ex) { + // delete merge index file if created, Review comment: what i meant to say is, here we can error as writing segment file failed, and then throw exception, because we just get the IO exception here and not any custom message exception, so if we add here, it will be easy for any future analysis. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#discussion_r488511466 ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -64,45 +68,85 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, if (allSegments == null || allSegments.length == 0) { return; } - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - // there is no segment or failed to read tablestatus file. - // so it should stop immediately. - if (details == null || details.length == 0) { - return; - } - Set<String> metadataSet = new HashSet<>(details.length); - for (LoadMetadataDetails detail : details) { - metadataSet.add(detail.getLoadName()); - } - List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); - for (CarbonFile segment : allSegments) { - String segmentName = segment.getName(); - // check segment folder pattern - if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { - String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); - if (parts.length == 2) { - boolean isOriginal = !parts[1].contains("."); - if (isCompactionFlow) { - // in compaction flow, it should be big segment and segment metadata is not exists - if (!isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); - } - } else { - // in loading flow, it should be original segment and segment metadata is not exists - if (isOriginal && !metadataSet.contains(parts[1])) { - staleSegments.add(segment); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + ICarbonLock carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); + try { + if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) { + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + // there is no segment or failed to read tablestatus file. + // so it should stop immediately. + if (details == null || details.length == 0) { + return; + } + Set<String> metadataSet = new HashSet<>(details.length); + for (LoadMetadataDetails detail : details) { + metadataSet.add(detail.getLoadName()); + } + List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length); + Set<String> staleSegmentsId = new HashSet<>(allSegments.length); + for (CarbonFile segment : allSegments) { + String segmentName = segment.getName(); + // check segment folder pattern + if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) { + String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE); + if (parts.length == 2) { + boolean isOriginal = !parts[1].contains("."); + if (isCompactionFlow) { + // in compaction flow, + // it should be merged segment and segment metadata doesn't exists + if (!isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } else { + // in loading flow, + // it should be original segment and segment metadata doesn't exists + if (isOriginal && !metadataSet.contains(parts[1])) { + staleSegments.add(segment); + staleSegmentsId.add(parts[1]); + } + } } } } + // delete segment folders one by one + for (CarbonFile staleSegment : staleSegments) { + try { + CarbonUtil.deleteFoldersAndFiles(staleSegment); + } catch (IOException | InterruptedException e) { + LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e); + } + } + if (staleSegments.size() > 0) { + // get the segment metadata path + String segmentFilesLocation = + CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()); + // delete the segment metadata files also + CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation) + .listFiles(file -> (staleSegmentsId + .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0]))); + for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) { + staleSegmentMetadataFile.delete(); + } + } + } else { + String errorMessage = + "Not able to acquire the Table status lock for partial load deletion for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + ", retry compaction"; + LOGGER.error(errorMessage); + if (isCompactionFlow) { Review comment: ok, now the comment bit clear. I will change it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3871: URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692593687 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2337/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |