[GitHub] [carbondata] ShreelekhyaG opened a new pull request #3988: [WIP] Clean index files when clean files command executed

classic Classic list List threaded Threaded
171 messages Options
1 ... 3456789
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-790788850


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3350/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795102087


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3768/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795103596


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5533/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-795238610


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3303/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ShreelekhyaG commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

ShreelekhyaG commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799169566


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799257028


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3801/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799257753


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5567/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799663865


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5569/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-799665489


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3803/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-803623990


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5067/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-803641094


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3313/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805645404


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5092/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805648459


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3340/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ShreelekhyaG commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

ShreelekhyaG commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805661810


   retest this please


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805746949


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5093/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-805753259


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3341/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-813201151


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5122/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-813203171


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3371/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r609339953



##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t
    * corresponding partitions.
    */
   public static void writeSegmentFile(String tablePath, final String taskNo, String location,

Review comment:
       this method is only called for partition table, so please rename it to `writeSegmentFileForPartitionTable` so that later developer shouldn't get confused.

##########
File path: core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
##########
@@ -511,16 +509,13 @@ public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
         UpdateVO updateVO =
             SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails());
         SegmentRefreshInfo segmentRefreshInfo;
+        String segmentFileName = segment.getSegmentFileName();
         if ((updateVO != null && updateVO.getLatestUpdateTimestamp() != null)
-            || segment.getSegmentFileName() != null) {
-          long segmentFileTimeStamp;
-          if (null != segment.getLoadMetadataDetails()) {
-            segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime();
-          } else {
-            segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
-                .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName()))
-                .getLastModifiedTime();
-          }
+            || segmentFileName != null) {
+          // get timestamp value from segment file name.

Review comment:
       please add a detailed comment here so that next time, if anyone changes this they should know the impact. you can mention in comment like
   "`Do not use getLastModifiedTime API on segment file carbon file object as it will slow down operation in Object stores like S3. Now the segment file is always written for operations which was overwriting earlier, so this timestamp can be checked always to check whether to refresh the cache or not`". Just try to put this in proper sentense

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    boolean isIndexFilesPresent = false;
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        isIndexFilesPresent = true;
+      }
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        // In case there are more than 1 mergeindex files present, latest one is considered as valid
+        // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files
+        long timeStamp =
+            Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+        if (timeStamp > lastModifiedTime) {
+          lastModifiedTime = timeStamp;
+          validIndexFile = indexFile;
+        }
+        mergeIndexFileNames.add(indexFile);
+      }
+    }
+    // get the invalid mergeindex files by excluding the valid file.
+    if (mergeIndexFileNames.size() > 1 && validIndexFile != null) {

Review comment:
       basically in this method, there are three to four scenarios that have been taken care of, so please mention all in the comments

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    boolean isIndexFilesPresent = false;
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        isIndexFilesPresent = true;
+      }
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        // In case there are more than 1 mergeindex files present, latest one is considered as valid
+        // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files
+        long timeStamp =
+            Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+        if (timeStamp > lastModifiedTime) {
+          lastModifiedTime = timeStamp;
+          validIndexFile = indexFile;
+        }
+        mergeIndexFileNames.add(indexFile);
+      }
+    }
+    // get the invalid mergeindex files by excluding the valid file.
+    if (mergeIndexFileNames.size() > 1 && validIndexFile != null) {
+      final String validIndexFileName = validIndexFile;
+      mergedAndInvalidIndexFiles.addAll(
+          mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName))
+              .collect(Collectors.toSet()));
+    }
+    if (isIndexFilesPresent && validIndexFile != null) {
+      indexFileStore.readMergeFile(validIndexFile);
+      Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+          indexFileStore.getCarbonMergeFileToIndexFilesMap();
+      String segmentPath =
+          validIndexFile.substring(0, validIndexFile.lastIndexOf(File.separator) + 1);
+      mergedAndInvalidIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(validIndexFile).stream()
+          .map(file -> segmentPath + file).collect(Collectors.toSet()));
+    }
+    if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) {
+      // if more than two index files present with different timestamps, then stale/invalid
+      // data is present.
+      Long validFile = indexFiles.stream()
+          .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file)))
+          .max(Long::compareTo).get();
+      mergedAndInvalidIndexFiles.addAll(
+          indexFiles.stream().filter(file -> !file.contains(validFile.toString()))
+              .collect(Collectors.toSet()));
+    }
+    return mergedAndInvalidIndexFiles;
+  }
+
+  public static CarbonFile[] getValidCarbonIndexFiles(CarbonFile[] carbonFiles) throws IOException {

Review comment:
       mention in a comment that this will be used in case of local file system and ay other default file system, can rename method also. But why we are not using a common method for all places? i can see for local file system case we are getting valid fils, but for hdfs and other we directly calculate size, it will be wrong info if there are stale files right?

##########
File path: integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -128,11 +130,13 @@ public void commitJob(JobContext jobContext) throws IOException {
       Configuration configuration = jobContext.getConfiguration();
       CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration);
       ThreadLocalSessionInfo.unsetAll();
+      CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+      String tablePath = carbonTable.getTablePath();
+      new CarbonIndexFileMergeWriter(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable())
+          .mergeCarbonIndexFilesOfSegment(carbonLoadModel.getSegmentId(), tablePath, false,

Review comment:
       remove line 134,
   replace `carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()` with `carbonTable`,
   do getTablePath at line 136 itself

##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -185,13 +184,24 @@ object CarbonMergeFilesRDD {
         val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
                                segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+        val uuid = String.valueOf(System.currentTimeMillis)
+        val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)

Review comment:
       please add a comment here saying do not use the timestamp present in the map, generate a new one instead of overwriting

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -217,8 +216,16 @@ object SecondaryIndexUtil {
       }
       if (finalMergeStatus) {
         if (null != mergeStatus && mergeStatus.length != 0) {
+          // Segment file is not yet written during SI load, then we can delete old index
+          // files immediately. If segment file is already present and SI refresh is triggered, then
+          // do not delete immediately to avoid failures during parallel queries.
           val validSegmentsToUse = validSegments.asScala
-            .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo))
+            .filter(segment => {

Review comment:
       here do we need to check whether from load or rebuild command o avoid immediate deletion?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -341,22 +350,6 @@ case class CarbonAddLoadCommand(
           carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava)
       }
 
-    // This event will trigger merge index job, only trigger it if it is carbon file
-    if (isCarbonFormat) {
-      operationContext.setProperty(
-        carbonTable.getTableUniqueName + "_Segment",
-        model.getSegmentId)
-      // when this event is triggered, SI load listener will be called for all the SI tables under
-      // this main table, no need to load the SI tables for add load command, so add this property
-      // to check in SI load event listener to avoid loading to SI.
-      operationContext.setProperty("isAddLoad", "true")
-      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =

Review comment:
       why this code removed? now merge code not in listener? and isAddLoad and other operation context property also removed

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.

Review comment:
       can you please rewrite the comment in a more formatted way, can refer to any src java code java doc, just make a paragraph and with some proper punctuations.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,79 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    boolean isIndexFilesPresent = false;
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        isIndexFilesPresent = true;
+      }
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        // In case there are more than 1 mergeindex files present, latest one is considered as valid
+        // Ex: In SI table, after small files index merge we will have more than 1 mergeindex files
+        long timeStamp =

Review comment:
       ```suggestion
           long indexFileTimeStamp =
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -163,23 +154,15 @@ public static void writeSegmentFile(String tablePath, String segmentId, String t
    * corresponding partitions.
    */
   public static void writeSegmentFile(String tablePath, final String taskNo, String location,
-      String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException {
+      String timeStamp, List<String> partitionNames) throws IOException {
     String tempFolderLoc = timeStamp + ".tmp";
     String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
     CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
     if (!carbonFile.exists()) {
       carbonFile.mkdirs();
     }
-    CarbonFile tempFolder;
-    if (isMergeIndexFlow) {
-      tempFolder = FileFactory.getCarbonFile(location);
-    } else {
-      tempFolder = FileFactory
-          .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
-    }
-
-    if ((tempFolder.exists() && partitionNames.size() > 0) || (isMergeIndexFlow
-        && partitionNames.size() > 0)) {
+    CarbonFile tempFolder = FileFactory.getCarbonFile(location);

Review comment:
       Here can you please briefly explain the logic of temp folder while writing segment file in case of partition table,

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) {
         }
       });
       if (listFiles != null && listFiles.length > 0) {
+        Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+            Arrays.stream(listFiles).map(file -> file.getAbsolutePath())
+                .collect(Collectors.toList()));
+        // Delete index files that are merged.
+        for (CarbonFile indexFile : listFiles) {
+          if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) {
+            indexFile.delete();
+          }
+        }
+        if (!mergedAndInvalidIndexFiles.isEmpty()) {
+          listFiles = Arrays.stream(listFiles)

Review comment:
       ```suggestion
             carbonIndexFiles = Arrays.stream(listFiles)
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -199,23 +182,9 @@ public boolean accept(CarbonFile file) {
         folderDetails.setRelative(isRelative);
         folderDetails.setPartitions(partitionNames);
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
-        for (CarbonFile file : carbonFiles) {
-          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-            folderDetails.setMergeFileName(file.getName());
-          } else {
-            folderDetails.getFiles().add(file.getName());
-          }
-        }
+        setIndexFileNamesToFolderDetails(folderDetails, carbonFiles);
         segmentFile.addPath(location, folderDetails);
-        String path = null;
-        if (isMergeIndexFlow) {
-          // in case of merge index flow, tasks are launched per partition and all the tasks
-          // will be written to the same tmp folder, in that case taskNo is not unique.
-          // To generate a unique fileName UUID is used

Review comment:
       please add this comment back, take care not to delete comments unless it is of no use, as it will be easy for future developers or during debugging

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) {
         }

Review comment:
       `CarbonFile[] listFiles = carbonFile.listFiles(file -> CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()));`, uselambda.
   

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -135,15 +139,24 @@ private void prepareLoadMetadata() {
     } else {
       segName = segment.getSegmentFileName();
     }
-    List<String> index = snapShot.get(segName);
-    if (null == index) {
-      index = new LinkedList<>();
+    List<String> indexFiles = snapShot.get(segName);
+    if (null == indexFiles) {
+      indexFiles = new LinkedList<>();
     }
-    for (String indexPath : index) {
-      if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
+    Set<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles);
+    if (!mergedIndexFiles.isEmpty()) {
+      // do not include already merged indexFiles files details.
+      indexFiles = indexFiles.stream().filter(

Review comment:
       assign filtered list to new variable and call it `filteredIndexFiles`

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1341,13 +1375,13 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri
   public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)

Review comment:
       i think this method you can move to SecondaryIndexUtil as private as it's used only for that case. don't keep in common place

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,13 +82,23 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore = null;
+    if (segment.getSegmentFileName() != null) {
+      fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    }
+    if (segment.getSegmentFileName() == null || fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
+      List<String> indexFileList = new ArrayList<>(indexFiles.keySet());
+      Set<String> mergedIndexFiles = SegmentFileStore.getInvalidAndMergedIndexFiles(indexFileList);
+      for (String indexFile : indexFileList) {

Review comment:
       replace for loop wth lamba function and do the filtering

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) {
         }
       });
       if (listFiles != null && listFiles.length > 0) {
+        Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+            Arrays.stream(listFiles).map(file -> file.getAbsolutePath())

Review comment:
       `Arrays.stream(listFiles).map(CarbonFile::getAbsolutePath)`

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -135,15 +139,24 @@ private void prepareLoadMetadata() {
     } else {
       segName = segment.getSegmentFileName();
     }
-    List<String> index = snapShot.get(segName);
-    if (null == index) {
-      index = new LinkedList<>();
+    List<String> indexFiles = snapShot.get(segName);
+    if (null == indexFiles) {
+      indexFiles = new LinkedList<>();
     }
-    for (String indexPath : index) {
-      if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
+    Set<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles);
+    if (!mergedIndexFiles.isEmpty()) {
+      // do not include already merged indexFiles files details.
+      indexFiles = indexFiles.stream().filter(
+          file -> !mergedIndexFiles.contains(file))
+          .collect(Collectors.toList());
+    }
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore

Review comment:
       if it contains mergeindex, then there will be only one merge index file per segment, so no need to loop all right, can just break out of loop.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -491,19 +491,8 @@ object CarbonDataRDDFactory {
         val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
           carbonLoadModel.getSegmentId,
           segmentMetaDataAccumulator)
-        val segmentFileName =
-          SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
-            String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
-        // clear segmentMetaDataAccumulator
         segmentMetaDataAccumulator.reset()
 
-        SegmentFileStore.updateTableStatusFile(

Review comment:
       this method should be called after writing segment file right? i don't see it in changes. Or the method at line 507, already takes care?

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -661,6 +687,20 @@ public boolean accept(CarbonFile file) {
         }
       });
       if (listFiles != null && listFiles.length > 0) {
+        Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+            Arrays.stream(listFiles).map(file -> file.getAbsolutePath())
+                .collect(Collectors.toList()));
+        // Delete index files that are merged.
+        for (CarbonFile indexFile : listFiles) {
+          if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) {
+            indexFile.delete();

Review comment:
       here we delete the files immediately, no need to wait for the clean file to do it? This won't lead to query failure? if not ,,please add a comment  in code to explain why we can delete here immediately and not other places

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
##########
@@ -131,7 +125,14 @@ object Compactor {
           secondaryIndexModel.segmentIdToLoadStartTimeMapping,
           indexCarbonTable,
           loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sqlContext)
-
+        if (rebuiltSegments.isEmpty) {
+          for (eachSegment <- secondaryIndexModel.validSegments) {
+            SegmentFileStore
+              .writeSegmentFile(indexCarbonTable,

Review comment:
       just confirm if any of the segment segment file writing fails, what happens

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -410,6 +379,24 @@ object SecondaryIndexCreator {
               indexTable,
               secondaryIndexModel.sqlContext.sparkSession)
         }
+        if (rebuiltSegments.isEmpty) {
+          for (loadMetadata <- loadMetadataDetails) {
+            SegmentFileStore
+              .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,

Review comment:
       same as above

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,28 +97,27 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
-      } else {
-        indexFiles = SegmentIndexFileStore
-            .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
       }
-      if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
-        if (sfs == null) {
-          return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
-              readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId, uuid);
-        } else {
-          return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
-              indexFiles, uuid, partitionPath);
+      if (sfs == null || indexFiles.isEmpty()) {
+        if (table.isHivePartitionTable()) {
+          segmentPath = partitionPath;
         }
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            isOldStoreIndexFilesPresent, segmentPath, segmentId, uuid, true);
+      } else {
+        return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded,
+            isOldStoreIndexFilesPresent, sfs,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), uuid, partitionPath);

Review comment:
       ```suggestion
               indexFiles.toArray(new CarbonFile[0]), uuid, partitionPath);
   ```

##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
##########
@@ -246,19 +246,22 @@ class CarbonIndexFileMergeTestCaseWithSI
         CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
+  private def getIndexFileCount(tableName: String,
+      segment: String,
+      extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath
       .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
     val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
-        .INDEX_FILE_EXT)
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||

Review comment:
       i think this code also present in other test classes,please refactor and move to test util class and reuse

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
##########
@@ -530,21 +615,28 @@ class CarbonIndexFileMergeTestCase
       FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
         .listFiles(true, new CarbonFileFilter {
           override def accept(file: CarbonFile): Boolean = {
-            file.getName.endsWith(extension)
+            file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+            file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
           }
         })
     } else {
       FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = {
-          file.getName.endsWith(extension)
+          file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+          file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
         }
       })
     }
-    if (carbonFiles != null) {
-      carbonFiles.size()
-    } else {
-      0
-    }
+    var validIndexFiles = SegmentFileStore.getValidCarbonIndexFiles(carbonFiles.asScala.toArray)
+    validIndexFiles = validIndexFiles.toStream
+      .filter(file => file.getName.endsWith(extension)).toArray
+    validIndexFiles.length
+  }
+
+  def getSegmentFileCount(tableName: String): Int = {

Review comment:
       please check if similar code is written in other test cases, if so, you can move to some test util.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -184,30 +185,49 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
     return indexLocationMap;
   }
 
-  private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
-      String segmentId, String uuid) throws IOException {
+  public String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
+      boolean isOldStoreIndexFilesPresent, String segmentPath,
+      String segmentId, String uuid, boolean readBasedOnUUID) throws IOException {
+    CarbonFile[] indexFiles = null;
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-    if (readFileFooterFromCarbonDataFile) {
+    if (isOldStoreIndexFilesPresent) {
       // this case will be used in case of upgrade where old store will not have the blocklet
       // info in the index file and therefore blocklet info need to be read from the file footer
       // in the carbondata file
-      fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid);
+      fileStore.readAllIndexAndFillBlockletInfo(segmentPath, null);

Review comment:
       why sending null here?

##########
File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
##########
@@ -1163,6 +1166,20 @@ public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, Stri
             tempFolderPath, currPartitionSpec);
   }
 
+  public static String mergeIndexFilesInTempSegment(CarbonTable table, String segmentId,

Review comment:
       please add method level comment, what is this tempfolder and when it will be used?

##########
File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
##########
@@ -1145,13 +1145,16 @@ public static void addIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDet
    * @param table
    * @param segmentId
    * @param uuid
+   * @paran partitionPath

Review comment:
       if not giving any explanation for parameters, no need to doc it

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -439,6 +430,73 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();

Review comment:
       please update the same in comment of the method




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-817568368


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3398/
   


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


1 ... 3456789