Login  Register

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

Posted by GitBox on Oct 28, 2020; 8:43am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-ShreelekhyaG-opened-a-new-pull-request-3988-WIP-Clean-index-files-when-clean-filesd-tp102150p102957.html


akashrn5 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r513210534



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";

Review comment:
       1. Since this is user exposed property, millisecond level is not good, please change to seconds level.
   2.
   ```suggestion
     public static final String CARBON_INDEXFILES_DELETETIME_IN_SECONDS = "carbon.index.files.delete.time.seconds";
   ```
   3. please add the proper comments.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {

Review comment:
       can you please add comment to this method with example, so that it will be clear why this required, now its little confusing. Also add the comments in caller if required. Then we will have one more review

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -306,11 +306,15 @@ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
       folderDetails.setRelative(false);
       segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+      List<String> mergedIndexFiles = getMergedIndexFiles(indexFiles);

Review comment:
       i think duplicate code is present in some three places, so please try if you can refactor it.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -91,6 +95,19 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());
+    CarbonFile[] carbonIndexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      carbonIndexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable

Review comment:
       remove variable desc, its straight forward, just keep method level comment

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)

Review comment:
       `SegmentIndexFileStore.getCarbonIndexFiles` does list files at a path, it will be slow, why do you need to use this? can't we get the index files from `sfs`?

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {

Review comment:
       ```suggestion
       for (CarbonFile indexFile : indexFiles) {
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {

Review comment:
       please add some detailed comment when this scenario can happen.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {
+          lastModifiedTime = file.getLastModifiedTime();
+          length = file.getLength();
+          validMergeIndexFile = file;
+        }
+        mergeIndexCarbonFiles.add(file);
+      }
+    }
+    if (mergeIndexCarbonFiles.size() > 1 && validMergeIndexFile != null) {
+      for (CarbonFile file : mergeIndexCarbonFiles) {

Review comment:
       why this loop required? because as i can see, you are already adding the merged indexFilenames to `mergedIndexFiles` in line 487.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }

Review comment:
       replace the for loop with lambda and map
   
   index.stream().map(Filefactory::getCarbonFile).toArray(CarbonFile::new);

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -2460,11 +2460,18 @@ public static int isFilterPresent(byte[][] filterValues,
         File file = new File(segmentPath);
         File[] segmentFiles = file.listFiles();
         if (null != segmentFiles) {
+          CarbonFile[] indexFiles = new CarbonFile[segmentFiles.length];
+          for (int i = 0; i < segmentFiles.length; i++) {
+            indexFiles[i] = FileFactory.getCarbonFile(segmentFiles[i].getAbsolutePath());
+          }

Review comment:
       same as above, use lambda and map

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }
+    List<String> mergedIndexFiles = SegmentFileStore.getMergedIndexFiles(indexFiles);
     for (String indexPath : index) {
       if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {

Review comment:
       you need to check for this file also right , whether it contains in `mergedIndexFiles `

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -2460,11 +2460,18 @@ public static int isFilterPresent(byte[][] filterValues,
         File file = new File(segmentPath);
         File[] segmentFiles = file.listFiles();

Review comment:
       this is base code, but can you rename to `dataAndIndexFiles` ?

##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -189,7 +189,7 @@ class PrestoInsertIntoTableTestCase
       "testtable")
     val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
     val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
-    assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)

Review comment:
       instead of changing the value of assert, its better if you can list valid files and assert

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {

Review comment:
       ```suggestion
       validSegments.foreach(segment => {
   ```

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -308,6 +308,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
             mergedLoadNumber,
             segmentMetaDataAccumulator)
+          MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)

Review comment:
       no need to check `CompactionType.IUD_DELETE_DELTA` ?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       `getValidAndInvalidSegments` will read table status again, but we are already reading the status once in `deleteLoadsAndUpdateMetadata`, move out from `deleteLoadsAndUpdateMetadata` and use same for all

##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -167,6 +167,13 @@ object CarbonMergeFilesRDD {
             executorService.submit(new Runnable {
               override def run(): Unit = {
                 ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+                val oldFolder = FileFactory.getCarbonFile(

Review comment:
       why this change required now? Is it fixing any bug?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -356,10 +357,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                             s"${ carbonLoadModel.getTableName }")
       }
 
-      if (compactionType != CompactionType.IUD_DELETE_DELTA &&

Review comment:
       please check and revert, comment at line number 352

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -620,6 +609,9 @@ object CarbonDataRDDFactory {
             carbonLoadModel)
         OperationListenerBus.getInstance()
           .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+        val segmentFileName =

Review comment:
       mergeIndex call will be writing the segment file and updating the table status file right? why do we need here again?
   are you calling just to update the `segmentMetaDataInfo`?
   I think here, we need only if the merge index is false, please check once and confirm

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
##########
@@ -94,21 +95,19 @@ object Compactor {
             forceAccessSegment, isCompactionCall = true,
             isLoadToFailedSISegments = false)
         allSegmentsLock ++= segmentLocks
-        CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
-          indexCarbonTable,
-          loadsToMerge,
-          validSegments.head,
-          segmentToSegmentTimestampMap,
-          segmentIdToLoadStartTimeMapping(validSegments.head),
-          SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
 
         // merge index files
         CarbonMergeFilesRDD.mergeIndexFiles(sqlContext.sparkSession,
           secondaryIndexModel.validSegments,
           segmentToSegmentTimestampMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
+        for (eachSegment <- secondaryIndexModel.validSegments) {

Review comment:
       `mergeIndexFiles` already writes the segment file and update the table status right? why do we need to write again?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)
+          .toList
+        indexFiles = indexFiles ++ carbonIndexFiles
+        cleanUpIndexFilesForSegment(sfs, indexFiles)
+      }
+    })
+  }
+
+  /**
+   * delete invalid and expired index files of segment
+   *
+   * @param sfs SegmentFileStore
+   * @param indexFiles List of carbon index files
+   */
+  def cleanUpIndexFilesForSegment(sfs: SegmentFileStore,
+      indexFiles: List[CarbonFile]): Unit = {
+    val indexFileStore: SegmentIndexFileStore = new SegmentIndexFileStore
+    indexFileStore.readAllIIndexOfSegment(sfs.getSegmentFile, sfs.getTablePath,
+      SegmentStatus.SUCCESS, true)
+    val carbonMergeFileToIndexFilesMap = indexFileStore.getCarbonMergeFileToIndexFilesMap.asScala
+    for (file <- indexFiles) {
+      for ((key, value) <- carbonMergeFileToIndexFilesMap) {
+        val isInvalidMergeFile =
+          file.getAbsolutePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) &&
+          !key.equalsIgnoreCase(file.getAbsolutePath)
+        val isInvalidIndexFile = value.contains(file.getAbsolutePath
+          .substring(file.getAbsolutePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1))
+        if (isInvalidIndexFile || isInvalidMergeFile) {
+          val agingTime: Long = System.currentTimeMillis - CarbonProperties.getInstance
+            .getProperty(CarbonCommonConstants.CARBON_INDEXFILES_DELETETIME,
+              CarbonCommonConstants.CARBON_INDEXFILES_DELETETIME_DEFAULT).toLong
+          if (file.getLastModifiedTime < agingTime) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(file)
+              LOGGER.info("deleted file + " + file.getPath)
+            } catch {
+              case e@(_: IOException | _: InterruptedException) =>
+                LOGGER.error("Deletion of index files failed.")

Review comment:
       file name also can be added in log for better traceability

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
##########
@@ -98,7 +98,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     sql("delete from table addsegment1 where segment.id in (2)")
     sql("clean files for table addsegment1")
     val oldFolder = FileFactory.getCarbonFile(newPath)
-    assert(oldFolder.listFiles.length == 2,
+    assert(oldFolder.listFiles.length == 3,

Review comment:
       instead of changing the count, can we just assert for the valid files? like the same comment given before. Please check for all the similar changes

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {
+          SegmentFileStore
+            .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
+              String.valueOf(loadMetadata.getLoadStartTime))
+        }

Review comment:
       `mergeDataFilesSISegments` API, first does merge small files, then it does operations like below
   
   `mergedata -> writesegmentfile for merged segments -> update table status -> adddatasizeinto meta(one more table status update) -> mergeIndex(for merged segments, mergeSegmentAPI itself writes the segment file and update the status)`
   
   This you can reduce to max, like the load case. Please check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]