CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-790788850 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3350/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795102087 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3768/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795103596 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5533/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795238610 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3303/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799169566 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799257028 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3801/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799257753 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5567/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799663865 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5569/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799665489 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3803/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-803623990 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5067/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-803641094 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3313/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805645404 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5092/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805648459 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3340/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805661810 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805746949 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5093/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805753259 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3341/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-813201151 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5122/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-813203171 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3371/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r609339953 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t * corresponding partitions. */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, Review comment: this method is only called for partition table, so please rename it to `writeSegmentFileForPartitionTable` so that later developer shouldn't get confused. ########## File path: core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java ########## @@ -511,16 +509,13 @@ public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) { UpdateVO updateVO = SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails()); SegmentRefreshInfo segmentRefreshInfo; + String segmentFileName = segment.getSegmentFileName(); if ((updateVO != null && updateVO.getLatestUpdateTimestamp() != null) - || segment.getSegmentFileName() != null) { - long segmentFileTimeStamp; - if (null != segment.getLoadMetadataDetails()) { - segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime(); - } else { - segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath - .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName())) - .getLastModifiedTime(); - } + || segmentFileName != null) { + // get timestamp value from segment file name. Review comment: please add a detailed comment here so that next time, if anyone changes this they should know the impact. you can mention in comment like "`Do not use getLastModifiedTime API on segment file carbon file object as it will slow down operation in Object stores like S3. Now the segment file is always written for operations which was overwriting earlier, so this timestamp can be checked always to check whether to refresh the cache or not`". Just try to put this in proper sentense ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); + long lastModifiedTime = 0L; + String validIndexFile = null; + List<String> mergeIndexFileNames = new ArrayList<>(); + boolean isIndexFilesPresent = false; + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + isIndexFilesPresent = true; + } + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + // In case there are more than 1 mergeindex files present, latest one is considered as valid + // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files + long timeStamp = + Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); + if (timeStamp > lastModifiedTime) { + lastModifiedTime = timeStamp; + validIndexFile = indexFile; + } + mergeIndexFileNames.add(indexFile); + } + } + // get the invalid mergeindex files by excluding the valid file. + if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { Review comment: basically in this method, there are three to four scenarios that have been taken care of, so please mention all in the comments ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); + long lastModifiedTime = 0L; + String validIndexFile = null; + List<String> mergeIndexFileNames = new ArrayList<>(); + boolean isIndexFilesPresent = false; + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + isIndexFilesPresent = true; + } + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + // In case there are more than 1 mergeindex files present, latest one is considered as valid + // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files + long timeStamp = + Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); + if (timeStamp > lastModifiedTime) { + lastModifiedTime = timeStamp; + validIndexFile = indexFile; + } + mergeIndexFileNames.add(indexFile); + } + } + // get the invalid mergeindex files by excluding the valid file. + if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { + final String validIndexFileName = validIndexFile; + mergedAndInvalidIndexFiles.addAll( + mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName)) + .collect(Collectors.toSet())); + } + if (isIndexFilesPresent && validIndexFile != null) { + indexFileStore.readMergeFile(validIndexFile); + Map<String, List<String>> carbonMergeFileToIndexFilesMap = + indexFileStore.getCarbonMergeFileToIndexFilesMap(); + String segmentPath = + validIndexFile.substring(0, validIndexFile.lastIndexOf(File.separator) + 1); + mergedAndInvalidIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(validIndexFile).stream() + .map(file -> segmentPath + file).collect(Collectors.toSet())); + } + if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) { + // if more than two index files present with different timestamps, then stale/invalid + // data is present. + Long validFile = indexFiles.stream() + .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file))) + .max(Long::compareTo).get(); + mergedAndInvalidIndexFiles.addAll( + indexFiles.stream().filter(file -> !file.contains(validFile.toString())) + .collect(Collectors.toSet())); + } + return mergedAndInvalidIndexFiles; + } + + public static CarbonFile[] getValidCarbonIndexFiles(CarbonFile[] carbonFiles) throws IOException { Review comment: mention in a comment that this will be used in case of local file system and ay other default file system, can rename method also. But why we are not using a common method for all places? i can see for local file system case we are getting valid fils, but for hdfs and other we directly calculate size, it will be wrong info if there are stale files right? ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -128,11 +130,13 @@ public void commitJob(JobContext jobContext) throws IOException { Configuration configuration = jobContext.getConfiguration(); CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration); ThreadLocalSessionInfo.unsetAll(); + CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + String tablePath = carbonTable.getTablePath(); + new CarbonIndexFileMergeWriter(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()) + .mergeCarbonIndexFilesOfSegment(carbonLoadModel.getSegmentId(), tablePath, false, Review comment: remove line 134, replace `carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()` with `carbonTable`, do getTablePath at line 136 itself ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -185,13 +184,24 @@ object CarbonMergeFilesRDD { val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" + segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp" + val uuid = String.valueOf(System.currentTimeMillis) + val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid) Review comment: please add a comment here saying do not use the timestamp present in the map, generate a new one instead of overwriting ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -217,8 +216,16 @@ object SecondaryIndexUtil { } if (finalMergeStatus) { if (null != mergeStatus && mergeStatus.length != 0) { + // Segment file is not yet written during SI load, then we can delete old index + // files immediately. If segment file is already present and SI refresh is triggered, then + // do not delete immediately to avoid failures during parallel queries. val validSegmentsToUse = validSegments.asScala - .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo)) + .filter(segment => { Review comment: here do we need to check whether from load or rebuild command o avoid immediate deletion? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala ########## @@ -341,22 +350,6 @@ case class CarbonAddLoadCommand( carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava) } - // This event will trigger merge index job, only trigger it if it is carbon file - if (isCarbonFormat) { - operationContext.setProperty( - carbonTable.getTableUniqueName + "_Segment", - model.getSegmentId) - // when this event is triggered, SI load listener will be called for all the SI tables under - // this main table, no need to load the SI tables for add load command, so add this property - // to check in SI load event listener to avoid loading to SI. - operationContext.setProperty("isAddLoad", "true") - val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = Review comment: why this code removed? now merge code not in listener? and isAddLoad and other operation context property also removed ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. Review comment: can you please rewrite the comment in a more formatted way, can refer to any src java code java doc, just make a paragraph and with some proper punctuations. ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); + long lastModifiedTime = 0L; + String validIndexFile = null; + List<String> mergeIndexFileNames = new ArrayList<>(); + boolean isIndexFilesPresent = false; + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + isIndexFilesPresent = true; + } + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + // In case there are more than 1 mergeindex files present, latest one is considered as valid + // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files + long timeStamp = Review comment: ```suggestion long indexFileTimeStamp = ``` ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t * corresponding partitions. */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, - String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException { + String timeStamp, List<String> partitionNames) throws IOException { String tempFolderLoc = timeStamp + ".tmp"; String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); if (!carbonFile.exists()) { carbonFile.mkdirs(); } - CarbonFile tempFolder; - if (isMergeIndexFlow) { - tempFolder = FileFactory.getCarbonFile(location); - } else { - tempFolder = FileFactory - .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); - } - - if ((tempFolder.exists() && partitionNames.size() > 0) || (isMergeIndexFlow - && partitionNames.size() > 0)) { + CarbonFile tempFolder = FileFactory.getCarbonFile(location); Review comment: Here can you please briefly explain the logic of temp folder while writing segment file in case of partition table, ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) { } }); if (listFiles != null && listFiles.length > 0) { + Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles( + Arrays.stream(listFiles).map(file -> file.getAbsolutePath()) + .collect(Collectors.toList())); + // Delete index files that are merged. + for (CarbonFile indexFile : listFiles) { + if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) { + indexFile.delete(); + } + } + if (!mergedAndInvalidIndexFiles.isEmpty()) { + listFiles = Arrays.stream(listFiles) Review comment: ```suggestion carbonIndexFiles = Arrays.stream(listFiles) ``` ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -199,23 +182,9 @@ public boolean accept(CarbonFile file) { folderDetails.setRelative(isRelative); folderDetails.setPartitions(partitionNames); folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); - for (CarbonFile file : carbonFiles) { - if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - folderDetails.setMergeFileName(file.getName()); - } else { - folderDetails.getFiles().add(file.getName()); - } - } + setIndexFileNamesToFolderDetails(folderDetails, carbonFiles); segmentFile.addPath(location, folderDetails); - String path = null; - if (isMergeIndexFlow) { - // in case of merge index flow, tasks are launched per partition and all the tasks - // will be written to the same tmp folder, in that case taskNo is not unique. - // To generate a unique fileName UUID is used Review comment: please add this comment back, take care not to delete comments unless it is of no use, as it will be easy for future developers or during debugging ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) { } Review comment: `CarbonFile[] listFiles = carbonFile.listFiles(file -> CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()));`, uselambda. ########## File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ########## @@ -135,15 +139,24 @@ private void prepareLoadMetadata() { } else { segName = segment.getSegmentFileName(); } - List<String> index = snapShot.get(segName); - if (null == index) { - index = new LinkedList<>(); + List<String> indexFiles = snapShot.get(segName); + if (null == indexFiles) { + indexFiles = new LinkedList<>(); } - for (String indexPath : index) { - if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1)); + Set<String> mergedIndexFiles = + SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles); + if (!mergedIndexFiles.isEmpty()) { + // do not include already merged indexFiles files details. + indexFiles = indexFiles.stream().filter( Review comment: assign filtered list to new variable and call it `filteredIndexFiles` ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1341,13 +1375,13 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath) Review comment: i think this method you can move to SecondaryIndexUtil as private as it's used only for that case. don't keep in common place ########## File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ########## @@ -79,13 +82,23 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException { Map<String, String> indexFiles; - if (segment.getSegmentFileName() == null) { + SegmentFileStore fileStore = null; + if (segment.getSegmentFileName() != null) { + fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); + } + if (segment.getSegmentFileName() == null || fileStore.getSegmentFile() == null) { String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path); + List<String> indexFileList = new ArrayList<>(indexFiles.keySet()); + Set<String> mergedIndexFiles = SegmentFileStore.getInvalidAndMergedIndexFiles(indexFileList); + for (String indexFile : indexFileList) { Review comment: replace for loop wth lamba function and do the filtering ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) { } }); if (listFiles != null && listFiles.length > 0) { + Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles( + Arrays.stream(listFiles).map(file -> file.getAbsolutePath()) Review comment: `Arrays.stream(listFiles).map(CarbonFile::getAbsolutePath)` ########## File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ########## @@ -135,15 +139,24 @@ private void prepareLoadMetadata() { } else { segName = segment.getSegmentFileName(); } - List<String> index = snapShot.get(segName); - if (null == index) { - index = new LinkedList<>(); + List<String> indexFiles = snapShot.get(segName); + if (null == indexFiles) { + indexFiles = new LinkedList<>(); } - for (String indexPath : index) { - if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1)); + Set<String> mergedIndexFiles = + SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles); + if (!mergedIndexFiles.isEmpty()) { + // do not include already merged indexFiles files details. + indexFiles = indexFiles.stream().filter( + file -> !mergedIndexFiles.contains(file)) + .collect(Collectors.toList()); + } + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + indexFileStore Review comment: if it contains mergeindex, then there will be only one merge index file per segment, so no need to loop all right, can just break out of loop. ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -491,19 +491,8 @@ object CarbonDataRDDFactory { val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator( carbonLoadModel.getSegmentId, segmentMetaDataAccumulator) - val segmentFileName = - SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, - String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo) - // clear segmentMetaDataAccumulator segmentMetaDataAccumulator.reset() - SegmentFileStore.updateTableStatusFile( Review comment: this method should be called after writing segment file right? i don't see it in changes. Or the method at line 507, already takes care? ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) { } }); if (listFiles != null && listFiles.length > 0) { + Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles( + Arrays.stream(listFiles).map(file -> file.getAbsolutePath()) + .collect(Collectors.toList())); + // Delete index files that are merged. + for (CarbonFile indexFile : listFiles) { + if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) { + indexFile.delete(); Review comment: here we delete the files immediately, no need to wait for the clean file to do it? This won't lead to query failure? if not ,,please add a comment in code to explain why we can delete here immediately and not other places ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala ########## @@ -131,7 +125,14 @@ object Compactor { secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sqlContext) - + if (rebuiltSegments.isEmpty) { + for (eachSegment <- secondaryIndexModel.validSegments) { + SegmentFileStore + .writeSegmentFile(indexCarbonTable, Review comment: just confirm if any of the segment segment file writing fails, what happens ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala ########## @@ -410,6 +379,24 @@ object SecondaryIndexCreator { indexTable, secondaryIndexModel.sqlContext.sparkSession) } + if (rebuiltSegments.isEmpty) { + for (loadMetadata <- loadMetadataDetails) { + SegmentFileStore + .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName, Review comment: same as above ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -95,28 +97,27 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId, indexFilesInPartition.add(indexCarbonFile); } } - indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]); + indexFiles = indexFilesInPartition; } else { - indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]); + indexFiles = indexCarbonFiles; } - } else { - indexFiles = SegmentIndexFileStore - .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid); } - if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { - if (sfs == null) { - return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded, - readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId, uuid); - } else { - return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs, - indexFiles, uuid, partitionPath); + if (sfs == null || indexFiles.isEmpty()) { + if (table.isHivePartitionTable()) { + segmentPath = partitionPath; } + return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded, + isOldStoreIndexFilesPresent, segmentPath, segmentId, uuid, true); + } else { + return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, + isOldStoreIndexFilesPresent, sfs, + indexFiles.toArray(new CarbonFile[indexFiles.size()]), uuid, partitionPath); Review comment: ```suggestion indexFiles.toArray(new CarbonFile[0]), uuid, partitionPath); ``` ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala ########## @@ -246,19 +246,22 @@ class CarbonIndexFileMergeTestCaseWithSI CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) } - private def getIndexFileCount(tableName: String, segment: String): Int = { + private def getIndexFileCount(tableName: String, + segment: String, + extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = { val table = CarbonMetadata.getInstance().getCarbonTable(tableName) val path = CarbonTablePath .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment) val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath - .INDEX_FILE_EXT) + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) || Review comment: i think this code also present in other test classes,please refactor and move to test util class and reuse ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ########## @@ -530,21 +615,28 @@ class CarbonIndexFileMergeTestCase FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath) .listFiles(true, new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { - file.getName.endsWith(extension) + file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) } }) } else { FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { - file.getName.endsWith(extension) + file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) || + file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) } }) } - if (carbonFiles != null) { - carbonFiles.size() - } else { - 0 - } + var validIndexFiles = SegmentFileStore.getValidCarbonIndexFiles(carbonFiles.asScala.toArray) + validIndexFiles = validIndexFiles.toStream + .filter(file => file.getName.endsWith(extension)).toArray + validIndexFiles.length + } + + def getSegmentFileCount(tableName: String): Int = { Review comment: please check if similar code is written in other test cases, if so, you can move to some test util. ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -184,30 +185,49 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId, return indexLocationMap; } - private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded, - boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles, - String segmentId, String uuid) throws IOException { + public String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded, + boolean isOldStoreIndexFilesPresent, String segmentPath, + String segmentId, String uuid, boolean readBasedOnUUID) throws IOException { + CarbonFile[] indexFiles = null; SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - if (readFileFooterFromCarbonDataFile) { + if (isOldStoreIndexFilesPresent) { // this case will be used in case of upgrade where old store will not have the blocklet // info in the index file and therefore blocklet info need to be read from the file footer // in the carbondata file - fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid); + fileStore.readAllIndexAndFillBlockletInfo(segmentPath, null); Review comment: why sending null here? ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1163,6 +1166,20 @@ public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, Stri tempFolderPath, currPartitionSpec); } + public static String mergeIndexFilesInTempSegment(CarbonTable table, String segmentId, Review comment: please add method level comment, what is this tempfolder and when it will be used? ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1145,13 +1145,16 @@ public static void addIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDet * @param table * @param segmentId * @param uuid + * @paran partitionPath Review comment: if not giving any explanation for parameters, no need to doc it ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -439,6 +430,73 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Review comment: please update the same in comment of the method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817568368 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3398/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |