CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-819425587 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3425/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-819428997 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5177/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r613146679 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -419,56 +425,66 @@ public boolean accept(CarbonFile file) { } /** - * Get old and invalid files which have already been merged to a mergeindex file.In segment folder - * we may have both .index files and .mergeindex files, as we are not deleting index files - * immediately for old tables, this method reads mergeindex file and adds mapped index files to a - * list and returns.If more than one mergeindex file is present, considers the latest one as valid - * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata. - * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index. + * Get stale and invalid index files that have already been merged. + * As we are not deleting index files immediately after updating old tables, + * we will have old index files in segment folder. + * This method is called in following scenarios: + * 1. During read, when segment file is not present or gets deleted. + * 2. When writing segment index size in tablestatus file. */ public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles) throws IOException { SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Set<String> mergedAndInvalidIndexFiles = new HashSet<>(); long lastModifiedTime = 0L; - String validIndexFile = null; + String validMergeIndexFile = null; List<String> mergeIndexFileNames = new ArrayList<>(); boolean isIndexFilesPresent = false; for (String indexFile : indexFiles) { if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { isIndexFilesPresent = true; } if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - // In case there are more than 1 mergeindex files present, latest one is considered as valid - // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files - long timeStamp = + long indexFileTimeStamp = Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile)); - if (timeStamp > lastModifiedTime) { - lastModifiedTime = timeStamp; - validIndexFile = indexFile; + // In case there are more than 1 mergeindex files present, latest one is considered as valid + if (indexFileTimeStamp > lastModifiedTime) { + lastModifiedTime = indexFileTimeStamp; + validMergeIndexFile = indexFile; } mergeIndexFileNames.add(indexFile); } } - // get the invalid mergeindex files by excluding the valid file. - if (mergeIndexFileNames.size() > 1 && validIndexFile != null) { - final String validIndexFileName = validIndexFile; + // Possible scenarios where we have >1 merge index file: + // 1. SI load when MT has stale index files. As during SI load MT segment file is not updated, + // we directly read from segment directory. + // 2. In SI table, after small files index merge(refresh command) we will have more than one + // mergeindex file as we are not deleting old file immediately. If segment file gets deleted, + // then it reads from segment directory. + if (mergeIndexFileNames.size() > 1 && validMergeIndexFile != null) { + final String validIndexFileName = validMergeIndexFile; + // get the invalid mergeindex files by excluding the valid file. mergedAndInvalidIndexFiles.addAll( mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName)) .collect(Collectors.toSet())); } - if (isIndexFilesPresent && validIndexFile != null) { - indexFileStore.readMergeFile(validIndexFile); + // If both index and merge index files are present, read the valid merge index file and add its + // mapped index files to the excluding list. + // Example: We have xxx.index and xxx.mergeindex files in a segment directory. + // Here the merged index file (xxx.index) is considered invalid. Review comment: ok, done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -105,7 +105,8 @@ public SegmentFileStore(String tablePath, String segmentFileName) throws IOExcep */ public static void writeSegmentFile(String tablePath, final String taskNo, String location, Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r613146871 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -184,7 +201,15 @@ public boolean accept(CarbonFile file) { folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); setIndexFileNamesToFolderDetails(folderDetails, carbonFiles); segmentFile.addPath(location, folderDetails); - String path = writePath + "/" + CarbonUtil.generateUUID() + CarbonTablePath.SEGMENT_EXT; + String path = null; + if (isMergeIndexFlow) { Review comment: added comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r613191589 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -231,73 +240,36 @@ object SecondaryIndexUtil { } else { siRebuildRDD.partitions.foreach { partition => val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition] - deleteOldCarbonDataFiles(carbonSparkPartition) + deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList) } } + val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] = + scala.collection.mutable.Map() + // merge index files and write segment file for merged segments mergedSegments.asScala.map { seg => + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName) + try { + new CarbonIndexFileMergeWriter(indexCarbonTable) + .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, + seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString, + true) + } catch { + case e: IOException => + val message = + "Failed to merge index files in path: " + segmentPath + ": " + e.getMessage() Review comment: its a scala class, so you can use `s"..... $variable ...."` for string concatination -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-819617133 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3428/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-819619347 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5180/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r613754424 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -231,73 +240,36 @@ object SecondaryIndexUtil { } else { siRebuildRDD.partitions.foreach { partition => val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition] - deleteOldCarbonDataFiles(carbonSparkPartition) + deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList) } } + val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] = + scala.collection.mutable.Map() + // merge index files and write segment file for merged segments mergedSegments.asScala.map { seg => + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName) + try { + new CarbonIndexFileMergeWriter(indexCarbonTable) + .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, + seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString, + true) + } catch { + case e: IOException => + val message = + "Failed to merge index files in path: " + segmentPath + ": " + e.getMessage() Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r614594602 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -231,73 +240,36 @@ object SecondaryIndexUtil { } else { siRebuildRDD.partitions.foreach { partition => val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition] - deleteOldCarbonDataFiles(carbonSparkPartition) + deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList) } } + val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] = + scala.collection.mutable.Map() + // merge index files and write segment file for merged segments mergedSegments.asScala.map { seg => + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName) + try { + new CarbonIndexFileMergeWriter(indexCarbonTable) + .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, + seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString, + true) + } catch { + case e: IOException => + val message = + s"Failed to merge index files in path: $segmentPath. " + e.getMessage() Review comment: ```suggestion s"Failed to merge index files in path: $segmentPath. ${e.getMessage()} " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r614596183 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -353,16 +342,38 @@ object SecondaryIndexUtil { } } + /** + * This method returns the list of index/merge index files for a segment in carbonTable. + */ + @throws[IOException] + private def getIndexFilesListForSegment(segment: Segment, tablePath: String): util.Set[String] = { + var indexFiles : util.Set[String] = new util.HashSet[String] + val segmentFileStore = new SegmentFileStore(tablePath, + segment.getSegmentFileName) + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo) + if (segmentFileStore.getSegmentFile == null) { + indexFiles = new SegmentIndexFileStore() + .getMergeOrIndexFilesFromSegment(segmentPath).keySet + } + else { Review comment: move this to above line, correct the style -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r614596639 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -353,16 +342,38 @@ object SecondaryIndexUtil { } } + /** + * This method returns the list of index/merge index files for a segment in carbonTable. + */ + @throws[IOException] + private def getIndexFilesListForSegment(segment: Segment, tablePath: String): util.Set[String] = { + var indexFiles : util.Set[String] = new util.HashSet[String] + val segmentFileStore = new SegmentFileStore(tablePath, + segment.getSegmentFileName) + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo) + if (segmentFileStore.getSegmentFile == null) { + indexFiles = new SegmentIndexFileStore() + .getMergeOrIndexFilesFromSegment(segmentPath).keySet + } + else { + indexFiles = segmentFileStore.getIndexAndMergeFiles.keySet + } + indexFiles + } + /** * This method delete the carbondata files present in pertition of during small * datafile merge after loading a segment to SI table. It should be deleted after * data file merge operation, else, concurrency can cause file not found issues. */ - private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition): Unit = { + private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition, + validSegmentsToUse: List[Segment]): Unit = { val splitList = partition.split.value.getAllSplits splitList.asScala.foreach { split => - val carbonFile = FileFactory.getCarbonFile(split.getFilePath) - carbonFile.delete() + if (validSegmentsToUse.contains(split.getSegment)) { + val carbonFile = FileFactory.getCarbonFile(split.getFilePath) Review comment: ```suggestion val mergedCarbonDataFile = FileFactory.getCarbonFile(split.getFilePath) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-822328365 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5205/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-822354240 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3458/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r615721648 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -140,19 +141,47 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .get .filterNot(streamingSegment.contains(_)) } + validSegments.foreach { segment => + if (segmentsToMerge.contains(segment.getSegmentNo)) { Review comment: move this if check as `.filter` before ` foreach` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-823040830 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5216/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-823047966 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3468/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r616422603 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -353,16 +342,38 @@ object SecondaryIndexUtil { } } + /** + * This method returns the list of index/merge index files for a segment in carbonTable. + */ + @throws[IOException] + private def getIndexFilesListForSegment(segment: Segment, tablePath: String): util.Set[String] = { + var indexFiles : util.Set[String] = new util.HashSet[String] + val segmentFileStore = new SegmentFileStore(tablePath, + segment.getSegmentFileName) + val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo) + if (segmentFileStore.getSegmentFile == null) { + indexFiles = new SegmentIndexFileStore() + .getMergeOrIndexFilesFromSegment(segmentPath).keySet + } + else { Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r616422944 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -140,19 +141,47 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .get .filterNot(streamingSegment.contains(_)) } + validSegments.foreach { segment => + if (segmentsToMerge.contains(segment.getSegmentNo)) { Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-823249962 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA2 commented on pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#issuecomment-823322943 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5217/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |