Login  Register

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

Posted by GitBox on Nov 03, 2020; 1:41pm
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-3988-WIP-Clean-index-files-when-clean-filesd-tp102150p103058.html


ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r515750468



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override
       public boolean accept(CarbonFile file) {
-        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
+        return (!oldIndexAndMergeIndexFiles.contains(file.getAbsolutePath()) && (
+            file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+                .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
       }
     });
   }
 
+  public static List<String> getOldIndexAndMergeIndexFiles() {
+    return oldIndexAndMergeIndexFiles;
+  }
+
+  public static void setOldIndexAndMergeIndexFiles(List<String> oldIndexAndMergeIndexFiles) {

Review comment:
       While writing merge index file /segment file, it gets index files from segment directory. And during SI small files merge step, we will have `old.index`, `new.index` files and only `new.index` is valid for merge step and writing segment file. For maintable also we could store and use like this for normal load but in few cases like add segment/reading from external location, we will have to read mergeindex file to identify invalid index files. And for clean files, I'm getting all index files from the segment directory and eliminate which are not present in segment file.

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {

Review comment:
       ok removed

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] carbonIndexFiles =
+        index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
+    List<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(carbonIndexFiles);
     for (String indexPath : index) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());
+    CarbonFile[] carbonIndexFiles =
+        index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);

Review comment:
       done. For `getInvalidAndMergedIndexFiles`, it takes `List<String>` as argument and returns `List<String>`.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);
       } else {
-        indexFiles =
+        CarbonFile[] indexFilesFromFile =

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);

Review comment:
       done

##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -188,8 +188,22 @@ class PrestoInsertIntoTableTestCase
     val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb",
       "testtable")
     val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
-    val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
-    assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)
+    val segment0 = CarbonTablePath
+      .getSegmentPath(carbonTable.getAbsoluteTableIdentifier.getTablePath, "0")

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files
     validSegments.asScala.foreach { segment =>
       SegmentFileStore.getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
         .asScala
         .foreach { indexFile =>
           if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < factTimeStamp) {
-            FileFactory.getCarbonFile(indexFile).delete()
+            oldIndexAndMergeIndexFiles.add(indexFile)
           }
         }
     }
+    SegmentIndexFileStore.setOldIndexAndMergeIndexFiles(oldIndexAndMergeIndexFiles)

Review comment:
       modified one existing test case and added one to check data file merge without enabling index merge property.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       `deleteLoadsAndUpdateMetadata` reads loadmetadata and is called from multiple places. so I think it's better to read inside method.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -327,6 +315,21 @@ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
     return false;
   }
 
+  public static void setIndexFileNamesToFolderDetails(FolderDetails folderDetails,
+      CarbonFile[] indexFiles) throws IOException {
+    List<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(indexFiles);
+    for (CarbonFile file : indexFiles) {
+      // do not include already merged index files details in segment file.
+      if (!mergedAndInvalidIndexFiles.contains(file.getName())) {

Review comment:
       I have changed the method from `getInvalidAndMergedIndexFiles` to `getValidCarbonIndexFiles`. In `getValidCarbonIndexFiles`, I have added the empty list check.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());

Review comment:
       without a list, getting concurrent modification exception. made the list to null at the end.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1366,18 +1412,10 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri
   /**
    * This method returns the list of index/merge index files for a segment in carbonTable.
    */
-  public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)
-      throws IOException {
+  public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath) {
     Set<String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
-      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
-      indexFiles =
-          new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
-    } else {
-      SegmentFileStore segmentFileStore =

Review comment:
       reverted and modified to collect invalid index files also.
     

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";

Review comment:
       Done. Removed `CARBON_INDEXFILES_DELETETIME` and modified to use `max.query.execution.time`

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }
+    List<String> mergedIndexFiles = SegmentFileStore.getMergedIndexFiles(indexFiles);
     for (String indexPath : index) {
       if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {

Review comment:
       Done.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       Done. Similarly made changes to use the same metadataDetails for `cleanUpDeltaFiles`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]