CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817571406 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5149/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817691096 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5151/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817700778 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3399/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817705864 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817764115 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3405/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817769205 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5157/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611625449 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -439,6 +430,73 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611625785 ########## File path: core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java ########## @@ -511,16 +509,13 @@ public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) { UpdateVO updateVO = SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails()); SegmentRefreshInfo segmentRefreshInfo; + String segmentFileName = segment.getSegmentFileName(); if ((updateVO != null && updateVO.getLatestUpdateTimestamp() != null) - || segment.getSegmentFileName() != null) { - long segmentFileTimeStamp; - if (null != segment.getLoadMetadataDetails()) { - segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime(); - } else { - segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath - .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName())) - .getLastModifiedTime(); - } + || segmentFileName != null) { + // get timestamp value from segment file name. Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611625926 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t * corresponding partitions. */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611626183 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t * corresponding partitions. */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, - String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException { + String timeStamp, List<String> partitionNames) throws IOException { String tempFolderLoc = timeStamp + ".tmp"; String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); if (!carbonFile.exists()) { carbonFile.mkdirs(); } - CarbonFile tempFolder; - if (isMergeIndexFlow) { - tempFolder = FileFactory.getCarbonFile(location); - } else { - tempFolder = FileFactory - .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); - } - - if ((tempFolder.exists() && partitionNames.size() > 0) || (isMergeIndexFlow - && partitionNames.size() > 0)) { + CarbonFile tempFolder = FileFactory.getCarbonFile(location); Review comment: Here, I have actually removed the part of the code where mergeIndex is false. But as discussed, for debug purposes we can keep the code as it is. So, I have reverted my changes in 2nd commit and added comments for the logic of the temp folder location. ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -199,23 +182,9 @@ public boolean accept(CarbonFile file) { folderDetails.setRelative(isRelative); folderDetails.setPartitions(partitionNames); folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); - for (CarbonFile file : carbonFiles) { - if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - folderDetails.setMergeFileName(file.getName()); - } else { - folderDetails.getFiles().add(file.getName()); - } - } + setIndexFileNamesToFolderDetails(folderDetails, carbonFiles); segmentFile.addPath(location, folderDetails); - String path = null; - if (isMergeIndexFlow) { - // in case of merge index flow, tasks are launched per partition and all the tasks - // will be written to the same tmp folder, in that case taskNo is not unique. - // To generate a unique fileName UUID is used Review comment: Done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. Review comment: Done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); + long lastModifiedTime = 0L; + String validIndexFile = null; + List<String> mergeIndexFileNames = new ArrayList<>(); + boolean isIndexFilesPresent = false; + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + isIndexFilesPresent = true; + } + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + // In case there are more than 1 mergeindex files present, latest one is considered as valid + // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files + long timeStamp = + Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); + if (timeStamp > lastModifiedTime) { + lastModifiedTime = timeStamp; + validIndexFile = indexFile; + } + mergeIndexFileNames.add(indexFile); + } + } + // get the invalid mergeindex files by excluding the valid file. + if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { Review comment: added comments ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) { return null; } + /** + * Get old and invalid files which have already been merged to a mergeindex file.In segment folder + * we may have both .index files and .mergeindex files, as we are not deleting index files + * immediately for old tables, this method reads mergeindex file and adds mapped index files to a + * list and returns.If more than one mergeindex file is present, considers the latest one as valid + * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. + * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + */ + public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) + throws IOException { + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); + long lastModifiedTime = 0L; + String validIndexFile = null; + List<String> mergeIndexFileNames = new ArrayList<>(); + boolean isIndexFilesPresent = false; + for (String indexFile : indexFiles) { + if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + isIndexFilesPresent = true; + } + if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + // In case there are more than 1 mergeindex files present, latest one is considered as valid + // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files + long timeStamp = + Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); + if (timeStamp > lastModifiedTime) { + lastModifiedTime = timeStamp; + validIndexFile = indexFile; + } + mergeIndexFileNames.add(indexFile); + } + } + // get the invalid mergeindex files by excluding the valid file. + if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { + final String validIndexFileName = validIndexFile; + mergedAndInvalidIndexFiles.addAll( + mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName)) + .collect(Collectors.toSet())); + } + if (isIndexFilesPresent && validIndexFile != null) { + indexFileStore.readMergeFile(validIndexFile); + Map<String, List<String>> carbonMergeFileToIndexFilesMap = + indexFileStore.getCarbonMergeFileToIndexFilesMap(); + String segmentPath = + validIndexFile.substring(0, validIndexFile.lastIndexOf(File.separator) + 1); + mergedAndInvalidIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(validIndexFile).stream() + .map(file -> segmentPath + file).collect(Collectors.toSet())); + } + if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) { + // if more than two index files present with different timestamps, then stale/invalid + // data is present. + Long validFile = indexFiles.stream() + .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file))) + .max(Long::compareTo).get(); + mergedAndInvalidIndexFiles.addAll( + indexFiles.stream().filter(file -> !file.contains(validFile.toString())) + .collect(Collectors.toSet())); + } + return mergedAndInvalidIndexFiles; + } + + public static CarbonFile[] getValidCarbonIndexFiles(CarbonFile[] carbonFiles) throws IOException { Review comment: modified the method to be used for other file systems also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611627327 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) { } }); if (listFiles != null && listFiles.length > 0) { + Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles( + Arrays.stream(listFiles).map(file -> file.getAbsolutePath()) + .collect(Collectors.toList())); + // Delete index files that are merged. + for (CarbonFile indexFile : listFiles) { + if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) { + indexFile.delete(); Review comment: `getSegmentFileForPhysicalDataPartitions ` method is called for only Alter add hive partition flow and as it is not old store, it can be deleted. We are triggering `alterTableMergeIndexEvent` in this flow, the index files are not deleted immediately after merge index because `isOldStoreIndexFilesPresent` flag is set true in this event. I didn't want to change the flag for Alter add hive partition flow as we may actually add old store data(store <= 1.1 version) and it has to read all blocklet info from the file footer of carbondata file. ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1341,13 +1375,13 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath) Review comment: Done ########## File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ########## @@ -135,15 +139,24 @@ private void prepareLoadMetadata() { } else { segName = segment.getSegmentFileName(); } - List<String> index = snapShot.get(segName); - if (null == index) { - index = new LinkedList<>(); + List<String> indexFiles = snapShot.get(segName); + if (null == indexFiles) { + indexFiles = new LinkedList<>(); } - for (String indexPath : index) { - if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1)); + Set<String> mergedIndexFiles = + SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles); + if (!mergedIndexFiles.isEmpty()) { + // do not include already merged indexFiles files details. + indexFiles = indexFiles.stream().filter( Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611627693 ########## File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ########## @@ -79,13 +82,23 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException { Map<String, String> indexFiles; - if (segment.getSegmentFileName() == null) { + SegmentFileStore fileStore = null; + if (segment.getSegmentFileName() != null) { + fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); + } + if (segment.getSegmentFileName() == null || fileStore.getSegmentFile() == null) { String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path); + List<String> indexFileList = new ArrayList<>(indexFiles.keySet()); + Set<String> mergedIndexFiles = SegmentFileStore.getInvalidAndMergedIndexFiles(indexFileList); + for (String indexFile : indexFileList) { Review comment: done ########## File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java ########## @@ -128,11 +130,13 @@ public void commitJob(JobContext jobContext) throws IOException { Configuration configuration = jobContext.getConfiguration(); CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration); ThreadLocalSessionInfo.unsetAll(); + CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + String tablePath = carbonTable.getTablePath(); + new CarbonIndexFileMergeWriter(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()) + .mergeCarbonIndexFilesOfSegment(carbonLoadModel.getSegmentId(), tablePath, false, Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611627895 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ########## @@ -491,19 +491,8 @@ object CarbonDataRDDFactory { val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator( carbonLoadModel.getSegmentId, segmentMetaDataAccumulator) - val segmentFileName = - SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, - String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo) - // clear segmentMetaDataAccumulator segmentMetaDataAccumulator.reset() - SegmentFileStore.updateTableStatusFile( Review comment: Yes, method at line 507 takes care of updating status file after segment file writing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611628066 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -185,13 +184,24 @@ object CarbonMergeFilesRDD { val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" + segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp" + val uuid = String.valueOf(System.currentTimeMillis) + val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid) Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala ########## @@ -131,7 +125,14 @@ object Compactor { secondaryIndexModel.segmentIdToLoadStartTimeMapping, indexCarbonTable, loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sqlContext) - + if (rebuiltSegments.isEmpty) { + for (eachSegment <- secondaryIndexModel.validSegments) { + SegmentFileStore + .writeSegmentFile(indexCarbonTable, Review comment: the compaction/load query fails with an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611628424 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -217,8 +216,16 @@ object SecondaryIndexUtil { } if (finalMergeStatus) { if (null != mergeStatus && mergeStatus.length != 0) { + // Segment file is not yet written during SI load, then we can delete old index + // files immediately. If segment file is already present and SI refresh is triggered, then + // do not delete immediately to avoid failures during parallel queries. val validSegmentsToUse = validSegments.asScala - .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo)) + .filter(segment => { Review comment: we can either check or make default behavior for load/rebuild command. As reading segment file can be a costly operation, I have changed to `isFileExist`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611628742 ########## File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ########## @@ -184,30 +185,49 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId, return indexLocationMap; } - private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded, - boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles, - String segmentId, String uuid) throws IOException { + public String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded, + boolean isOldStoreIndexFilesPresent, String segmentPath, + String segmentId, String uuid, boolean readBasedOnUUID) throws IOException { + CarbonFile[] indexFiles = null; SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - if (readFileFooterFromCarbonDataFile) { + if (isOldStoreIndexFilesPresent) { // this case will be used in case of upgrade where old store will not have the blocklet // info in the index file and therefore blocklet info need to be read from the file footer // in the carbondata file - fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid); + fileStore.readAllIndexAndFillBlockletInfo(segmentPath, null); Review comment: `readAllIndexAndFillBlockletInfo` is being used when old store is present. and this old store may not always match with uuid. In addHivePartition flow, we can have index files with different timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611629016 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala ########## @@ -341,22 +350,6 @@ case class CarbonAddLoadCommand( carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava) } - // This event will trigger merge index job, only trigger it if it is carbon file - if (isCarbonFormat) { - operationContext.setProperty( - carbonTable.getTableUniqueName + "_Segment", - model.getSegmentId) - // when this event is triggered, SI load listener will be called for all the SI tables under - // this main table, no need to load the SI tables for add load command, so add this property - // to check in SI load event listener to avoid loading to SI. - operationContext.setProperty("isAddLoad", "true") - val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = Review comment: I'm directly calling mergeIndex( using `mergeIndexFilesInAddLoadSegment`) here instead of the listener. So, no need of setting `isAddLoadProperty`. And I have removed this property from SI listener also. ########## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala ########## @@ -246,19 +246,22 @@ class CarbonIndexFileMergeTestCaseWithSI CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) } - private def getIndexFileCount(tableName: String, segment: String): Int = { + private def getIndexFileCount(tableName: String, + segment: String, + extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = { val table = CarbonMetadata.getInstance().getCarbonTable(tableName) val path = CarbonTablePath .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment) val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath - .INDEX_FILE_EXT) + override def accept(file: CarbonFile): Boolean = { + file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) || Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r611629414 ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1163,6 +1166,20 @@ public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, Stri tempFolderPath, currPartitionSpec); } + public static String mergeIndexFilesInTempSegment(CarbonTable table, String segmentId, Review comment: refactored method name and added comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Unsubscribe
On Mon, 12 Apr 2021, 19:46 GitBox, <[hidden email]> wrote: > > ShreelekhyaG commented on pull request #3988: > URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817705864 > > > retest this please > > > -- > This is an automated message from the Apache Git Service. > To respond to the message, please log on to GitHub and use the > URL above to go to the specific comment. > > For queries about this service, please contact Infrastructure at: > [hidden email] > > > |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r612948684 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -419,56 +425,66 @@ public boolean accept(CarbonFile file) { } /** - * Get old and invalid files which have already been merged to a mergeindex file.In segment folder - * we may have both .index files and .mergeindex files, as we are not deleting index files - * immediately for old tables, this method reads mergeindex file and adds mapped index files to a - * list and returns.If more than one mergeindex file is present, considers the latest one as valid - * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. - * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + * Get stale and invalid index files that have already been merged. + * As we are not deleting index files immediately after updating old tables, + * we will have old index files in segment folder. + * This method is called in following scenarios: + * 1. During read, when segment file is not present or gets deleted. + * 2. When writing segment index size in tablestatus file. */ public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) throws IOException { SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); long lastModifiedTime = 0L; - String validIndexFile = null; + String validMergeIndexFile = null; List<String> mergeIndexFileNames = new ArrayList<>(); boolean isIndexFilesPresent = false; for (String indexFile : indexFiles) { if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { isIndexFilesPresent = true; } if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - // In case there are more than 1 mergeindex files present, latest one is considered as valid - // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files - long timeStamp = + long indexFileTimeStamp = Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); - if (timeStamp > lastModifiedTime) { - lastModifiedTime = timeStamp; - validIndexFile = indexFile; + // In case there are more than 1 mergeindex files present, latest one is considered as valid + if (indexFileTimeStamp > lastModifiedTime) { + lastModifiedTime = indexFileTimeStamp; + validMergeIndexFile = indexFile; } mergeIndexFileNames.add(indexFile); } } - // get the invalid mergeindex files by excluding the valid file. - if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { - final String validIndexFileName = validIndexFile; + // Possible scenarios where we have >1 merge index file: + // 1. SI load when MT has stale index files. As during SI load MT segment file is not updated, + // we directly read from segment directory. + // 2. In SI table, after small files index merge(refresh command) we will have more than one + // mergeindex file as we are not deleting old file immediately. If segment file gets deleted, + // then it reads from segment directory. + if (mergeIndexFileNames.size() > 1 && validMergeIndexFile != null) { + final String validIndexFileName = validMergeIndexFile; + // get the invalid mergeindex files by excluding the valid file. mergedAndInvalidIndexFiles.addAll( mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName)) .collect(Collectors.toSet())); } - if (isIndexFilesPresent && validIndexFile != null) { - indexFileStore.readMergeFile(validIndexFile); + // If both index and merge index files are present, read the valid merge index file and add its + // mapped index files to the excluding list. + // Example: We have xxx.index and xxx.mergeindex files in a segment directory. + // Here the merged index file (xxx.index) is considered invalid. Review comment: i think this line is confusing, shouldn't it be like ` Here the index file (xxx.index) is considered invalid.` ? ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -105,7 +105,8 @@ public SegmentFileStore(String tablePath, String segmentFileName) throws IOExcep */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, Review comment: main method name also can be changed i guess, as internally it always called to write the segment file for partition table. ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -184,7 +201,15 @@ public boolean accept(CarbonFile file) { folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); setIndexFileNamesToFolderDetails(folderDetails, carbonFiles); segmentFile.addPath(location, folderDetails); - String path = writePath + "/" + CarbonUtil.generateUUID() + CarbonTablePath.SEGMENT_EXT; + String path = null; + if (isMergeIndexFlow) { Review comment: i think its better to add comments in all the places where the merge index will be false saying that the code will be used for developer debugging purpose, so that it wont be confused with actual flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |