[GitHub] [carbondata] ajantha-bhat opened a new pull request #3871: [WIP] Fix multiple issues

classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat opened a new pull request #3871: [WIP] Fix multiple issues

GitBox

ajantha-bhat opened a new pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871


    ### Why is this PR needed?
   
    Auto compaction/minor compaction was happening multiple times for same segments.
   Executor (for merge index and merge data files) segmentfile write failure and table status update failure is not handled.
   when compaction failed, no need to call merge index
   Segment file not cleaned up when table status update failed for compaction
   Some table status retry issues
   
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [WIP] Fix multiple issues

GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-665517022






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

akashrn5 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667921823


   @ajantha-bhat please resolve conflicts


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r464307098



##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -234,13 +235,15 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
+          mergeIndexFiles.add(entry.getKey() + "/" + mergeIndexFile);
           segment.getValue().setFiles(new HashSet<String>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
+            // TODO: handle failure here

Review comment:
       is TODO needs to be handled in this PR? please handle if yes

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -234,13 +235,15 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
+          mergeIndexFiles.add(entry.getKey() + "/" + mergeIndexFile);

Review comment:
       use `FILE_SEPARATOR` constant

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -373,11 +381,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       } else {
         true
       }
-      // here either of the conditions can be true, when delete segment is fired after compaction
-      // has started, statusFileUpdation will be false , but at the same time commitComplete can be

Review comment:
       here may be instead of deleting the complete comment, you can rewrite the comment required for `commitComplete`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -357,7 +357,6 @@ object SecondaryIndexCreator {
       if (null != executorService) {
         executorService.shutdownNow()
       }
-

Review comment:
       revert this change

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -54,39 +59,77 @@
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    //delete folder which metadata no exist in tablestatus
-    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
-    if (FileFactory.isFileExist(partitionPath)) {
-      final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile path) {
-          String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
-          boolean found = false;
-          for (int j = 0; j < details.length; j++) {
-            if (details[j].getLoadName().equals(segmentId)) {
-              found = true;
-              break;
+    ICarbonLock carbonTableStatusLock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+    try {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+        //delete folder which metadata no exist in tablestatus
+        String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+        if (FileFactory.isFileExist(partitionPath)) {
+          final LoadMetadataDetails[] details =
+              SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
+          CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+            @Override public boolean accept(CarbonFile path) {
+              String segmentId = CarbonTablePath.DataFileUtil
+                  .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
+              boolean found = false;
+              for (int j = 0; j < details.length; j++) {
+                if (details[j].getLoadName().equals(segmentId)) {
+                  found = true;
+                  break;
+                }
+              }
+              return !found;
+            }
+          });
+          CarbonFile[] allSegmentFiles = null;
+          if (isCompactionFlow) {
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+          }
+          for (int k = 0; k < listFiles.length; k++) {
+            String segmentId = CarbonTablePath.DataFileUtil
+                .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+            if (isCompactionFlow) {
+              if (segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+                // if segment file exist delete it.
+                for (CarbonFile segment : allSegmentFiles) {
+                  if (segment.getPath().contains(segmentId)) {
+                    segment.delete();
+                    break;
+                  }
+                }
+              }
+            } else {
+              if (!segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              }
             }
           }
-          return !found;
         }
-      });
-      for (int k = 0; k < listFiles.length; k++) {
-        String segmentId = CarbonTablePath.DataFileUtil
-            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+      } else {
         if (isCompactionFlow) {
-          if (segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
-        } else {
-          if (!segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
+          LOGGER.error(
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction");
+          throw new RuntimeException(

Review comment:
       Do we really need to throw exception here? because since it's clean up, we can just give an error message and retry next time instead of failing the operation, what do you think?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
##########
@@ -173,11 +180,15 @@ public static boolean recordLoadMetadata(List<LoadMetadataDetails> newLoadMetada
         LOGGER.error(
             "Not able to acquire the lock for Table status updation for table " + databaseName + "."
                 + tableName);
+        throw new RuntimeException(
+            "Not able to acquire the lock for Table status updation for table " + databaseName + "."
+                + tableName);
       }
     } catch (IOException e) {
       LOGGER.error(
           "Not able to acquire the lock for Table status updation for table " + databaseName + "."
               + tableName);
+

Review comment:
       revert this change

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
##########
@@ -235,18 +235,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         throw new ErrorMessage(s"Number of columns in Index table cannot be more than " +
                                "number of key columns in Source table")
       }
-
-      var isColsIndexedAsPerTable = true

Review comment:
       can you please tell me why this change required?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

akashrn5 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667926580


   @ajantha-bhat please create a JIRA and update the PR description


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-667927752


   @akashrn5 : This is WIP PR, please wait


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691855754


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4057/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691858887


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2319/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [WIP] Fix multiple issues

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-691996188


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2327/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r487850404



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -373,11 +381,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       } else {
         true
       }
-      // here either of the conditions can be true, when delete segment is fired after compaction
-      // has started, statusFileUpdation will be false , but at the same time commitComplete can be

Review comment:
       the existing comment in the code was that, why we need to consider both the flag. Now only one flag is there. so, it is understandable.
   
   Based on new code if it is not understandable and if you have any suggestion for comment, you can give here. I will add.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r487851114



##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -54,39 +59,77 @@
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    //delete folder which metadata no exist in tablestatus
-    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
-    if (FileFactory.isFileExist(partitionPath)) {
-      final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile path) {
-          String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
-          boolean found = false;
-          for (int j = 0; j < details.length; j++) {
-            if (details[j].getLoadName().equals(segmentId)) {
-              found = true;
-              break;
+    ICarbonLock carbonTableStatusLock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+    try {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+        //delete folder which metadata no exist in tablestatus
+        String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+        if (FileFactory.isFileExist(partitionPath)) {
+          final LoadMetadataDetails[] details =
+              SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
+          CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+            @Override public boolean accept(CarbonFile path) {
+              String segmentId = CarbonTablePath.DataFileUtil
+                  .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
+              boolean found = false;
+              for (int j = 0; j < details.length; j++) {
+                if (details[j].getLoadName().equals(segmentId)) {
+                  found = true;
+                  break;
+                }
+              }
+              return !found;
+            }
+          });
+          CarbonFile[] allSegmentFiles = null;
+          if (isCompactionFlow) {
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+          }
+          for (int k = 0; k < listFiles.length; k++) {
+            String segmentId = CarbonTablePath.DataFileUtil
+                .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+            if (isCompactionFlow) {
+              if (segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+                // if segment file exist delete it.
+                for (CarbonFile segment : allSegmentFiles) {
+                  if (segment.getPath().contains(segmentId)) {
+                    segment.delete();
+                    break;
+                  }
+                }
+              }
+            } else {
+              if (!segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              }
             }
           }
-          return !found;
         }
-      });
-      for (int k = 0; k < listFiles.length; k++) {
-        String segmentId = CarbonTablePath.DataFileUtil
-            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+      } else {
         if (isCompactionFlow) {
-          if (segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
-        } else {
-          if (!segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
+          LOGGER.error(
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction");
+          throw new RuntimeException(

Review comment:
       yes, for compaction flow I faced the issue. If the stale segments not cleared. there will be duplicate data after compaction. so, I cannot go ahead for compaction if the stale data is not cleaned. so, throwing the exception here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r487851114



##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -54,39 +59,77 @@
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    //delete folder which metadata no exist in tablestatus
-    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
-    if (FileFactory.isFileExist(partitionPath)) {
-      final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile path) {
-          String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
-          boolean found = false;
-          for (int j = 0; j < details.length; j++) {
-            if (details[j].getLoadName().equals(segmentId)) {
-              found = true;
-              break;
+    ICarbonLock carbonTableStatusLock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+    try {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+        //delete folder which metadata no exist in tablestatus
+        String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+        if (FileFactory.isFileExist(partitionPath)) {
+          final LoadMetadataDetails[] details =
+              SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
+          CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+            @Override public boolean accept(CarbonFile path) {
+              String segmentId = CarbonTablePath.DataFileUtil
+                  .getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
+              boolean found = false;
+              for (int j = 0; j < details.length; j++) {
+                if (details[j].getLoadName().equals(segmentId)) {
+                  found = true;
+                  break;
+                }
+              }
+              return !found;
+            }
+          });
+          CarbonFile[] allSegmentFiles = null;
+          if (isCompactionFlow) {
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            allSegmentFiles = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+          }
+          for (int k = 0; k < listFiles.length; k++) {
+            String segmentId = CarbonTablePath.DataFileUtil
+                .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+            if (isCompactionFlow) {
+              if (segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+                // if segment file exist delete it.
+                for (CarbonFile segment : allSegmentFiles) {
+                  if (segment.getPath().contains(segmentId)) {
+                    segment.delete();
+                    break;
+                  }
+                }
+              }
+            } else {
+              if (!segmentId.contains(".")) {
+                CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              }
             }
           }
-          return !found;
         }
-      });
-      for (int k = 0; k < listFiles.length; k++) {
-        String segmentId = CarbonTablePath.DataFileUtil
-            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
+      } else {
         if (isCompactionFlow) {
-          if (segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
-        } else {
-          if (!segmentId.contains(".")) {
-            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-          }
+          LOGGER.error(
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction");
+          throw new RuntimeException(

Review comment:
       yes, for compaction flow I faced the issue. If the stale segments not cleared. there will be duplicate data after compaction. so, I cannot go ahead for compaction if the stale data is not cleaned. so, throwing the exception here to retry the compaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003279


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4066/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003597


   @akashrn5 , It was a WIP PR. Now I got some time and worked on it.
   PR is ready you can freshly review and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat edited a comment on pull request #3871: [CARBONDATA-3986] Fix multiple issues

GitBox
In reply to this post by GitBox

ajantha-bhat edited a comment on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692003597


   @akashrn5 , It was a WIP PR. Now I got some time and worked on it. Removed WIP now.
   PR is ready you can freshly review and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r487993198



##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            } catch (Exception ex) {
+              // delete merge index file if created,
+              // keep only index files as segment file writing is failed

Review comment:
       can you add the error log here just with the message, not the throwable

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-      SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
+      String content = SegmentStatusManager.readFileAsString(path);
+      try {
+        SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      } catch (Exception ex) {
+        // delete merge index file if created,
+        // keep only index files as segment file writing is failed
+        for (String file : mergeIndexFiles) {

Review comment:
       ```suggestion
           for (String mergeIndexFile : mergeIndexFiles) {
   ```

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata is not exists

Review comment:
       ```suggestion
                     // it should be original segment and segment metadata does not exists
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);

Review comment:
       i know its base code, but can you replace `_` with constant `UNDERSCORE` and name the boolean variable

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-      SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
+      String content = SegmentStatusManager.readFileAsString(path);
+      try {
+        SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      } catch (Exception ex) {
+        // delete merge index file if created,

Review comment:
       can you add the error log here just with the message, not the throwable. It will help for any analysis in issues

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists

Review comment:
       ```suggestion
                     // in compaction flow, it should be big segment and segment metadata does not exists
   ```

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata is not exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+            }
+          }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // collect the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            CarbonFile[] allSegmentMetadataFiles =

Review comment:
       instead of list files all, you can list files whole names starts with id present in `staleSegments` and directly delete those

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata is not exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+            }
+          }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // collect the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            CarbonFile[] allSegmentMetadataFiles =
+                FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+            // delete the segment metadata files also
+            for (CarbonFile segmentMetadataFile : allSegmentMetadataFiles) {
+              String segmentId =
+                  segmentMetadataFile.getName().split(CarbonCommonConstants.UNDERSCORE)[0];
+              if (staleSegmentsId.contains(segmentId)) {
+                segmentMetadataFile.delete();
               }
             }
           }
+        } else {
+          LOGGER.error(
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction");

Review comment:
       here the error message is same as below, like retry compaction. Please check and change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r488444308



##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata is not exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+            }
+          }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // collect the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            CarbonFile[] allSegmentMetadataFiles =
+                FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+            // delete the segment metadata files also
+            for (CarbonFile segmentMetadataFile : allSegmentMetadataFiles) {
+              String segmentId =
+                  segmentMetadataFile.getName().split(CarbonCommonConstants.UNDERSCORE)[0];
+              if (staleSegmentsId.contains(segmentId)) {
+                segmentMetadataFile.delete();
               }
             }
           }
+        } else {
+          LOGGER.error(
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction");

Review comment:
       assigned to a variable

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,89 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow, it should be big segment and segment metadata is not exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata is not exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+            }
+          }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // collect the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            CarbonFile[] allSegmentMetadataFiles =

Review comment:
       ok

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-      SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
+      String content = SegmentStatusManager.readFileAsString(path);
+      try {
+        SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      } catch (Exception ex) {
+        // delete merge index file if created,

Review comment:
       In both the callers, they are catching these exceptions and adding log. Same behavior.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            } catch (Exception ex) {
+              // delete merge index file if created,
+              // keep only index files as segment file writing is failed

Review comment:
       In both the callers, they are catching this exceptions and adding log. Same behavior.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);

Review comment:
       > name the boolean variable
   This is java. Handled only _ change.

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);

Review comment:
       > name the boolean variable
   
   This is java. Handled only _ change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r488503684



##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -233,15 +234,24 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
         }
         if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) {
           segment.getValue().setMergeFileName(mergeIndexFile);
-          segment.getValue().setFiles(new HashSet<String>());
+          mergeIndexFiles
+              .add(entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + mergeIndexFile);
+          segment.getValue().setFiles(new HashSet<>());
           break;
         }
       }
       if (table.isHivePartitionTable()) {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
-            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            try {
+              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                  segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+            } catch (Exception ex) {
+              // delete merge index file if created,
+              // keep only index files as segment file writing is failed

Review comment:
       what i meant to say is, here we can error as writing segment file failed, and then throw exception, because we just get the IO exception here and not any custom message exception, so if we add here, it will be easy for any future analysis.

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,85 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
-              }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow,
+                  // it should be merged segment and segment metadata doesn't exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata doesn't exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
             }
           }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegmentsId
+                    .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction";
+          LOGGER.error(errorMessage);
+          if (isCompactionFlow) {

Review comment:
       here its little bit confusing for me, the message is same, saying retry compaction, then why throw inside `isCompactionFlow` condition. If we want to throw inside the if condition, then the error message should be different when not compaction flow  right?

##########
File path: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -251,9 +261,29 @@ public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-      SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
+      String content = SegmentStatusManager.readFileAsString(path);
+      try {
+        SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      } catch (Exception ex) {
+        // delete merge index file if created,

Review comment:
       what i meant to say is, here we can error as writing segment file failed, and then throw exception, because we just get the IO exception here and not any custom message exception, so if we add here, it will be easy for any future analysis.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3871: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#discussion_r488511466



##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -64,45 +68,85 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       if (allSegments == null || allSegments.length == 0) {
         return;
       }
-      LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-      // there is no segment or failed to read tablestatus file.
-      // so it should stop immediately.
-      if (details == null || details.length == 0) {
-        return;
-      }
-      Set<String> metadataSet = new HashSet<>(details.length);
-      for (LoadMetadataDetails detail : details) {
-        metadataSet.add(detail.getLoadName());
-      }
-      List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-      for (CarbonFile segment : allSegments) {
-        String segmentName = segment.getName();
-        // check segment folder pattern
-        if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-          String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-          if (parts.length == 2) {
-            boolean isOriginal = !parts[1].contains(".");
-            if (isCompactionFlow) {
-              // in compaction flow, it should be big segment and segment metadata is not exists
-              if (!isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
-              }
-            } else {
-              // in loading flow, it should be original segment and segment metadata is not exists
-              if (isOriginal && !metadataSet.contains(parts[1])) {
-                staleSegments.add(segment);
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          if (details == null || details.length == 0) {
+            return;
+          }
+          Set<String> metadataSet = new HashSet<>(details.length);
+          for (LoadMetadataDetails detail : details) {
+            metadataSet.add(detail.getLoadName());
+          }
+          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
+          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
+          for (CarbonFile segment : allSegments) {
+            String segmentName = segment.getName();
+            // check segment folder pattern
+            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+              if (parts.length == 2) {
+                boolean isOriginal = !parts[1].contains(".");
+                if (isCompactionFlow) {
+                  // in compaction flow,
+                  // it should be merged segment and segment metadata doesn't exists
+                  if (!isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                } else {
+                  // in loading flow,
+                  // it should be original segment and segment metadata doesn't exists
+                  if (isOriginal && !metadataSet.contains(parts[1])) {
+                    staleSegments.add(segment);
+                    staleSegmentsId.add(parts[1]);
+                  }
+                }
               }
             }
           }
+          // delete segment folders one by one
+          for (CarbonFile staleSegment : staleSegments) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(staleSegment);
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegmentsId
+                    .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for partial load deletion for table "
+                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+                  + ", retry compaction";
+          LOGGER.error(errorMessage);
+          if (isCompactionFlow) {

Review comment:
       ok, now the comment bit clear. I will change it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3871: [CARBONDATA-3986] Fix multiple issues during compaction and concurrent scenarios

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3871:
URL: https://github.com/apache/carbondata/pull/3871#issuecomment-692593687


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2337/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12