Posted by
GitBox on
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-3988-WIP-Clean-index-files-when-clean-filesd-tp102150p107508.html
ShreelekhyaG commented on a change in pull request #3988:
URL:
https://github.com/apache/carbondata/pull/3988#discussion_r611626183##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t
* corresponding partitions.
*/
public static void writeSegmentFile(String tablePath, final String taskNo, String location,
- String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException {
+ String timeStamp, List<String> partitionNames) throws IOException {
String tempFolderLoc = timeStamp + ".tmp";
String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
if (!carbonFile.exists()) {
carbonFile.mkdirs();
}
- CarbonFile tempFolder;
- if (isMergeIndexFlow) {
- tempFolder = FileFactory.getCarbonFile(location);
- } else {
- tempFolder = FileFactory
- .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
- }
-
- if ((tempFolder.exists() && partitionNames.size() > 0) || (isMergeIndexFlow
- && partitionNames.size() > 0)) {
+ CarbonFile tempFolder = FileFactory.getCarbonFile(location);
Review comment:
Here, I have actually removed the part of the code where mergeIndex is false. But as discussed, for debug purposes we can keep the code as it is. So, I have reverted my changes in 2nd commit and added comments for the logic of the temp folder location.
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -199,23 +182,9 @@ public boolean accept(CarbonFile file) {
folderDetails.setRelative(isRelative);
folderDetails.setPartitions(partitionNames);
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
- for (CarbonFile file : carbonFiles) {
- if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- folderDetails.setMergeFileName(file.getName());
- } else {
- folderDetails.getFiles().add(file.getName());
- }
- }
+ setIndexFileNamesToFolderDetails(folderDetails, carbonFiles);
segmentFile.addPath(location, folderDetails);
- String path = null;
- if (isMergeIndexFlow) {
- // in case of merge index flow, tasks are launched per partition and all the tasks
- // will be written to the same tmp folder, in that case taskNo is not unique.
- // To generate a unique fileName UUID is used
Review comment:
Done
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
return null;
}
+ /**
+ * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+ * we may have both .index files and .mergeindex files, as we are not deleting index files
+ * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+ * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+ * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
Review comment:
Done
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
return null;
}
+ /**
+ * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+ * we may have both .index files and .mergeindex files, as we are not deleting index files
+ * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+ * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+ * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+ * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+ */
+ public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+ throws IOException {
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+ long lastModifiedTime = 0L;
+ String validIndexFile = null;
+ List<String> mergeIndexFileNames = new ArrayList<>();
+ boolean isIndexFilesPresent = false;
+ for (String indexFile : indexFiles) {
+ if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ isIndexFilesPresent = true;
+ }
+ if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ // In case there are more than 1 mergeindex files present, latest one is considered as valid
+ // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files
+ long timeStamp =
+ Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+ if (timeStamp > lastModifiedTime) {
+ lastModifiedTime = timeStamp;
+ validIndexFile = indexFile;
+ }
+ mergeIndexFileNames.add(indexFile);
+ }
+ }
+ // get the invalid mergeindex files by excluding the valid file.
+ if (mergeIndexFileNames.size() > 1 && validIndexFile != null) {
Review comment:
added comments
##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
return null;
}
+ /**
+ * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+ * we may have both .index files and .mergeindex files, as we are not deleting index files
+ * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+ * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+ * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+ * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+ */
+ public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+ throws IOException {
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+ long lastModifiedTime = 0L;
+ String validIndexFile = null;
+ List<String> mergeIndexFileNames = new ArrayList<>();
+ boolean isIndexFilesPresent = false;
+ for (String indexFile : indexFiles) {
+ if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ isIndexFilesPresent = true;
+ }
+ if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ // In case there are more than 1 mergeindex files present, latest one is considered as valid
+ // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files
+ long timeStamp =
+ Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+ if (timeStamp > lastModifiedTime) {
+ lastModifiedTime = timeStamp;
+ validIndexFile = indexFile;
+ }
+ mergeIndexFileNames.add(indexFile);
+ }
+ }
+ // get the invalid mergeindex files by excluding the valid file.
+ if (mergeIndexFileNames.size() > 1 && validIndexFile != null) {
+ final String validIndexFileName = validIndexFile;
+ mergedAndInvalidIndexFiles.addAll(
+ mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName))
+ .collect(Collectors.toSet()));
+ }
+ if (isIndexFilesPresent && validIndexFile != null) {
+ indexFileStore.readMergeFile(validIndexFile);
+ Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+ indexFileStore.getCarbonMergeFileToIndexFilesMap();
+ String segmentPath =
+ validIndexFile.substring(0, validIndexFile.lastIndexOf(File.separator) + 1);
+ mergedAndInvalidIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(validIndexFile).stream()
+ .map(file -> segmentPath + file).collect(Collectors.toSet()));
+ }
+ if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) {
+ // if more than two index files present with different timestamps, then stale/invalid
+ // data is present.
+ Long validFile = indexFiles.stream()
+ .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file)))
+ .max(Long::compareTo).get();
+ mergedAndInvalidIndexFiles.addAll(
+ indexFiles.stream().filter(file -> !file.contains(validFile.toString()))
+ .collect(Collectors.toSet()));
+ }
+ return mergedAndInvalidIndexFiles;
+ }
+
+ public static CarbonFile[] getValidCarbonIndexFiles(CarbonFile[] carbonFiles) throws IOException {
Review comment:
modified the method to be used for other file systems also.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]