[GitHub] [carbondata] ShreelekhyaG opened a new pull request #3988: [WIP] Clean index files when clean files command executed

classic Classic list List threaded Threaded
171 messages Options
12345 ... 9
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-717267646


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4707/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-717268180


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2950/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r513210534



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";

Review comment:
       1. Since this is user exposed property, millisecond level is not good, please change to seconds level.
   2.
   ```suggestion
     public static final String CARBON_INDEXFILES_DELETETIME_IN_SECONDS = "carbon.index.files.delete.time.seconds";
   ```
   3. please add the proper comments.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {

Review comment:
       can you please add comment to this method with example, so that it will be clear why this required, now its little confusing. Also add the comments in caller if required. Then we will have one more review

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -306,11 +306,15 @@ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
       folderDetails.setRelative(false);
       segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+      List<String> mergedIndexFiles = getMergedIndexFiles(indexFiles);

Review comment:
       i think duplicate code is present in some three places, so please try if you can refactor it.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -91,6 +95,19 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());
+    CarbonFile[] carbonIndexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      carbonIndexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable

Review comment:
       remove variable desc, its straight forward, just keep method level comment

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)

Review comment:
       `SegmentIndexFileStore.getCarbonIndexFiles` does list files at a path, it will be slow, why do you need to use this? can't we get the index files from `sfs`?

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {

Review comment:
       ```suggestion
       for (CarbonFile indexFile : indexFiles) {
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {

Review comment:
       please add some detailed comment when this scenario can happen.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {
+          lastModifiedTime = file.getLastModifiedTime();
+          length = file.getLength();
+          validMergeIndexFile = file;
+        }
+        mergeIndexCarbonFiles.add(file);
+      }
+    }
+    if (mergeIndexCarbonFiles.size() > 1 && validMergeIndexFile != null) {
+      for (CarbonFile file : mergeIndexCarbonFiles) {

Review comment:
       why this loop required? because as i can see, you are already adding the merged indexFilenames to `mergedIndexFiles` in line 487.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }

Review comment:
       replace the for loop with lambda and map
   
   index.stream().map(Filefactory::getCarbonFile).toArray(CarbonFile::new);

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -2460,11 +2460,18 @@ public static int isFilterPresent(byte[][] filterValues,
         File file = new File(segmentPath);
         File[] segmentFiles = file.listFiles();
         if (null != segmentFiles) {
+          CarbonFile[] indexFiles = new CarbonFile[segmentFiles.length];
+          for (int i = 0; i < segmentFiles.length; i++) {
+            indexFiles[i] = FileFactory.getCarbonFile(segmentFiles[i].getAbsolutePath());
+          }

Review comment:
       same as above, use lambda and map

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }
+    List<String> mergedIndexFiles = SegmentFileStore.getMergedIndexFiles(indexFiles);
     for (String indexPath : index) {
       if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {

Review comment:
       you need to check for this file also right , whether it contains in `mergedIndexFiles `

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -2460,11 +2460,18 @@ public static int isFilterPresent(byte[][] filterValues,
         File file = new File(segmentPath);
         File[] segmentFiles = file.listFiles();

Review comment:
       this is base code, but can you rename to `dataAndIndexFiles` ?

##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -189,7 +189,7 @@ class PrestoInsertIntoTableTestCase
       "testtable")
     val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
     val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
-    assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)

Review comment:
       instead of changing the value of assert, its better if you can list valid files and assert

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {

Review comment:
       ```suggestion
       validSegments.foreach(segment => {
   ```

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -308,6 +308,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
             mergedLoadNumber,
             segmentMetaDataAccumulator)
+          MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)

Review comment:
       no need to check `CompactionType.IUD_DELETE_DELTA` ?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       `getValidAndInvalidSegments` will read table status again, but we are already reading the status once in `deleteLoadsAndUpdateMetadata`, move out from `deleteLoadsAndUpdateMetadata` and use same for all

##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -167,6 +167,13 @@ object CarbonMergeFilesRDD {
             executorService.submit(new Runnable {
               override def run(): Unit = {
                 ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+                val oldFolder = FileFactory.getCarbonFile(

Review comment:
       why this change required now? Is it fixing any bug?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -356,10 +357,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                             s"${ carbonLoadModel.getTableName }")
       }
 
-      if (compactionType != CompactionType.IUD_DELETE_DELTA &&

Review comment:
       please check and revert, comment at line number 352

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -620,6 +609,9 @@ object CarbonDataRDDFactory {
             carbonLoadModel)
         OperationListenerBus.getInstance()
           .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+        val segmentFileName =

Review comment:
       mergeIndex call will be writing the segment file and updating the table status file right? why do we need here again?
   are you calling just to update the `segmentMetaDataInfo`?
   I think here, we need only if the merge index is false, please check once and confirm

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {

Review comment:
       same as above

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
##########
@@ -94,21 +95,19 @@ object Compactor {
             forceAccessSegment, isCompactionCall = true,
             isLoadToFailedSISegments = false)
         allSegmentsLock ++= segmentLocks
-        CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
-          indexCarbonTable,
-          loadsToMerge,
-          validSegments.head,
-          segmentToSegmentTimestampMap,
-          segmentIdToLoadStartTimeMapping(validSegments.head),
-          SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
 
         // merge index files
         CarbonMergeFilesRDD.mergeIndexFiles(sqlContext.sparkSession,
           secondaryIndexModel.validSegments,
           segmentToSegmentTimestampMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
+        for (eachSegment <- secondaryIndexModel.validSegments) {

Review comment:
       `mergeIndexFiles` already writes the segment file and update the table status right? why do we need to write again?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)
+          .toList
+        indexFiles = indexFiles ++ carbonIndexFiles
+        cleanUpIndexFilesForSegment(sfs, indexFiles)
+      }
+    })
+  }
+
+  /**
+   * delete invalid and expired index files of segment
+   *
+   * @param sfs SegmentFileStore
+   * @param indexFiles List of carbon index files
+   */
+  def cleanUpIndexFilesForSegment(sfs: SegmentFileStore,
+      indexFiles: List[CarbonFile]): Unit = {
+    val indexFileStore: SegmentIndexFileStore = new SegmentIndexFileStore
+    indexFileStore.readAllIIndexOfSegment(sfs.getSegmentFile, sfs.getTablePath,
+      SegmentStatus.SUCCESS, true)
+    val carbonMergeFileToIndexFilesMap = indexFileStore.getCarbonMergeFileToIndexFilesMap.asScala
+    for (file <- indexFiles) {
+      for ((key, value) <- carbonMergeFileToIndexFilesMap) {
+        val isInvalidMergeFile =
+          file.getAbsolutePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) &&
+          !key.equalsIgnoreCase(file.getAbsolutePath)
+        val isInvalidIndexFile = value.contains(file.getAbsolutePath
+          .substring(file.getAbsolutePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1))
+        if (isInvalidIndexFile || isInvalidMergeFile) {
+          val agingTime: Long = System.currentTimeMillis - CarbonProperties.getInstance
+            .getProperty(CarbonCommonConstants.CARBON_INDEXFILES_DELETETIME,
+              CarbonCommonConstants.CARBON_INDEXFILES_DELETETIME_DEFAULT).toLong
+          if (file.getLastModifiedTime < agingTime) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(file)
+              LOGGER.info("deleted file + " + file.getPath)
+            } catch {
+              case e@(_: IOException | _: InterruptedException) =>
+                LOGGER.error("Deletion of index files failed.")

Review comment:
       file name also can be added in log for better traceability

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
##########
@@ -98,7 +98,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     sql("delete from table addsegment1 where segment.id in (2)")
     sql("clean files for table addsegment1")
     val oldFolder = FileFactory.getCarbonFile(newPath)
-    assert(oldFolder.listFiles.length == 2,
+    assert(oldFolder.listFiles.length == 3,

Review comment:
       instead of changing the count, can we just assert for the valid files? like the same comment given before. Please check for all the similar changes

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {
+          SegmentFileStore
+            .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
+              String.valueOf(loadMetadata.getLoadStartTime))
+        }

Review comment:
       `mergeDataFilesSISegments` API, first does merge small files, then it does operations like below
   
   `mergedata -> writesegmentfile for merged segments -> update table status -> adddatasizeinto meta(one more table status update) -> mergeIndex(for merged segments, mergeSegmentAPI itself writes the segment file and update the status)`
   
   This you can reduce to max, like the load case. Please check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

QiangCai commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-719134288


   In the description, after it implements point 2, it should immediately delete the index file. So point 1 is no needed, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-720655263






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ShreelekhyaG commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r515750468



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override
       public boolean accept(CarbonFile file) {
-        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
+        return (!oldIndexAndMergeIndexFiles.contains(file.getAbsolutePath()) && (
+            file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+                .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
       }
     });
   }
 
+  public static List<String> getOldIndexAndMergeIndexFiles() {
+    return oldIndexAndMergeIndexFiles;
+  }
+
+  public static void setOldIndexAndMergeIndexFiles(List<String> oldIndexAndMergeIndexFiles) {

Review comment:
       While writing merge index file /segment file, it gets index files from segment directory. And during SI small files merge step, we will have `old.index`, `new.index` files and only `new.index` is valid for merge step and writing segment file. For maintable also we could store and use like this for normal load but in few cases like add segment/reading from external location, we will have to read mergeindex file to identify invalid index files. And for clean files, I'm getting all index files from the segment directory and eliminate which are not present in segment file.

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {

Review comment:
       ok removed

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] carbonIndexFiles =
+        index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
+    List<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(carbonIndexFiles);
     for (String indexPath : index) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());
+    CarbonFile[] carbonIndexFiles =
+        index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);

Review comment:
       done. For `getInvalidAndMergedIndexFiles`, it takes `List<String>` as argument and returns `List<String>`.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);
       } else {
-        indexFiles =
+        CarbonFile[] indexFilesFromFile =

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);

Review comment:
       done

##########
File path: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -188,8 +188,22 @@ class PrestoInsertIntoTableTestCase
     val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteIdentifier("testdb",
       "testtable")
     val carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
-    val segmentFoldersLocation = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
-    assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size() == 8)
+    val segment0 = CarbonTablePath
+      .getSegmentPath(carbonTable.getAbsoluteTableIdentifier.getTablePath, "0")

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files
     validSegments.asScala.foreach { segment =>
       SegmentFileStore.getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
         .asScala
         .foreach { indexFile =>
           if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < factTimeStamp) {
-            FileFactory.getCarbonFile(indexFile).delete()
+            oldIndexAndMergeIndexFiles.add(indexFile)
           }
         }
     }
+    SegmentIndexFileStore.setOldIndexAndMergeIndexFiles(oldIndexAndMergeIndexFiles)

Review comment:
       modified one existing test case and added one to check data file merge without enabling index merge property.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       `deleteLoadsAndUpdateMetadata` reads loadmetadata and is called from multiple places. so I think it's better to read inside method.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() > length) {

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -327,6 +315,21 @@ public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
     return false;
   }
 
+  public static void setIndexFileNamesToFolderDetails(FolderDetails folderDetails,
+      CarbonFile[] indexFiles) throws IOException {
+    List<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(indexFiles);
+    for (CarbonFile file : indexFiles) {
+      // do not include already merged index files details in segment file.
+      if (!mergedAndInvalidIndexFiles.contains(file.getName())) {

Review comment:
       I have changed the method from `getInvalidAndMergedIndexFiles` to `getValidCarbonIndexFiles`. In `getValidCarbonIndexFiles`, I have added the empty list check.

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());

Review comment:
       without a list, getting concurrent modification exception. made the list to null at the end.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1366,18 +1412,10 @@ public static void removeTempFolder(Map<String, FolderDetails> locationMap, Stri
   /**
    * This method returns the list of index/merge index files for a segment in carbonTable.
    */
-  public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)
-      throws IOException {
+  public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath) {
     Set<String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
-      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
-      indexFiles =
-          new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
-    } else {
-      SegmentFileStore segmentFileStore =

Review comment:
       reverted and modified to collect invalid index files also.
     

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";

Review comment:
       Done. Removed `CARBON_INDEXFILES_DELETETIME` and modified to use `max.query.execution.time`

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }
+    List<String> mergedIndexFiles = SegmentFileStore.getMergedIndexFiles(indexFiles);
     for (String indexPath : index) {
       if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {

Review comment:
       Done.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       Done. Similarly made changes to use the same metadataDetails for `cleanUpDeltaFiles`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] kunal642 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

kunal642 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r516152662



##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,13 +83,24 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =

Review comment:
       based on the old check if segmentFileName can be null then this code can fail..Please check this

##########
File path: core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,13 +83,24 @@ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
+      List<String> indexFileList = new ArrayList<>(indexFiles.keySet());
+      Set<String> mergedIndexFiles =
+          SegmentFileStore.getInvalidAndMergedIndexFiles(indexFileList);
+      for (String indexFile : indexFileList) {
+        if (mergedIndexFiles
+            .contains(indexFile.substring(indexFile.lastIndexOf(File.separator) + 1))) {
+          // do not include already merged index files details.
+          indexFiles.remove(indexFile);
+        }
+      }
+      indexFileList = null;

Review comment:
       this is not needed

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -2558,8 +2558,8 @@ public static long getCarbonIndexSize(SegmentFileStore fileStore,
   // Get the total size of carbon data and the total size of carbon index
   public static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath,
       Segment segment) throws IOException {
-    if (segment.getSegmentFileName() != null) {
-      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+    SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() != null) {

Review comment:
       same as previous check..Please check if this will fail when segmmentFileName is null

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -79,6 +79,11 @@
    */
   private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
 
+  /**
+   * Stores the list of invalid index files of the SI segments in case of small files.
+   */
+  private static Set<String> oldSIIndexAndMergeIndexFiles = new HashSet<>();

Review comment:
       This would fail in case of concurrent queries as 1 operation may override for another

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -274,20 +274,22 @@ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
                 SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
+        Boolean isUpdateRequired = false;

Review comment:
       use boolean instead of Boolean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r515746959



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = "carbon.index.delete.time";

Review comment:
       agree with @ajantha-bhat , we can reuse the query execution timeout and delete as its also one hour.

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  override protected def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS cleantest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "0")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS cleantest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+        CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME.toString)
+  }
+
+  test("test clean files command for index files") {
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, issue date) STORED AS carbondata")
+    sql("insert into table cleantest select '1','2000-02-01'")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+    sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files with SI") {
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, issue date, name string) STORED AS carbondata")
+    sql("insert into table cleantest select '1','2000-02-01', 'abc' ")
+    sql("create index indextable1 on table cleantest (name) AS 'carbondata'")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getIndexFileCountFromSegmentPath("default_indextable1", "0") == 1)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+    assert(getIndexFileCountFromSegmentPath("default_indextable1", "0") == 0)
+    sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command on partition table") {
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, issue date) STORED AS carbondata " +
+        "partitioned by (name string)")
+    sql("insert into table cleantest select '1','2000-02-01', 'abc' ")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+    sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files after update") {
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, name string) STORED AS carbondata")
+    sql("insert into table cleantest select '1', 'abc' ")
+    sql("insert into table cleantest select '2', 'abc' ")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 1)
+    sql("update cleantest set (name)=('xyz') where id=2")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 2)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "1") == 1)
+    checkAnswer(sql("select *from cleantest where id=2"), Seq(Row(2, "xyz")))
+  }
+
+  test("test clean files without mergeindex") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, name string) STORED AS carbondata")
+    sql("insert into table cleantest select '1', 'abc' ")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getIndexFileCountFromSegmentFile("default_cleantest", "0") == 1)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getIndexFileCountFromSegmentFile("default_cleantest", "0") == 1)
+    checkAnswer(sql("select *from cleantest where id=1"), Seq(Row(1, "abc")))
+  }
+
+  test("test clean files after index merge and check segment count") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("drop table if exists cleantest")
+    sql(s"create table cleantest(id int, issue date) STORED AS carbondata")
+    sql("insert into table cleantest select '1','2000-02-01'")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getSegmentFileCount("default_cleantest") == 1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE cleantest COMPACT 'SEGMENT_INDEX'").collect()
+    assert(getSegmentFileCount("default_cleantest") == 2)
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    assert(getIndexFileCountFromSegmentPath("default_cleantest",
+      "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    sql("clean files for table cleantest")
+    assert(getSegmentFileCount("default_cleantest") == 1)
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+  }
+
+  private def getSegmentFileCount(tableName: String): Int = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_cleantest")
+    val segmentsPath = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
+    FileFactory.getCarbonFile(segmentsPath).listFiles(true).size()
+  }
+
+  private def getIndexFileCountFromSegmentFile(tableName: String, segmentNo: String): Int = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
+    val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+    new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      .getIndexFiles.size()
+  }
+
+  private def getIndexFileCountFromSegmentPath(tableName: String,

Review comment:
       is this method used somewhere also? if yes, do not duplicate the code, you can add in any test utility class and then use everywhere same

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  override protected def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS cleantest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "0")
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS cleantest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+        CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME.toString)
+  }
+
+  test("test clean files command for index files") {
+    sql("drop table if exists cleantest")
+    sql("create table cleantest(id int, issue date) STORED AS carbondata")
+    sql("insert into table cleantest select '1','2000-02-01'")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 1)
+    sql("clean files for table cleantest")
+    assert(getIndexFileCountFromSegmentPath("default_cleantest", "0") == 0)
+    sql("drop table if exists cleantest")
+  }
+
+  test("test clean files command for index files with SI") {

Review comment:
       combine `test clean files command for index files` and `test clean files command for index files with SI` in a single test case, as not much diff




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

akashrn5 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-720916001


   @ShreelekhyaG please add SI and update, delete behavior also in the PR description


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721146477


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3011/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721147068


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4769/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721263694


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4770/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721266363


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3012/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721919113


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4775/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-721925160


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3017/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-722404214


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4778/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-722406337


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3020/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-722478978


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4779/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-722482714


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3021/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3988: [CARBONDATA-4037] Improve the table status and segment file writing

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#issuecomment-724163252


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4787/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12345 ... 9