[GitHub] [carbondata] marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622
 
 
    ### Why is this PR needed?
    Now, Cleaning temp index files in the mergeindex flow takes a lot of time, sometimes it will take 2~3 mins, which should be optimized
   
    ### What changes were proposed in this PR?
   Clean temp index files in parallel in merge index flow
       
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - No
   
       
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586551417
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/299/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586554100
 
 
   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2003/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] marchpure commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
marchpure commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586568016
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586569602
 
 
   Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/306/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586575134
 
 
   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2009/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r379984005
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
 
 Review comment:
   why not use `deleteAllCarbonFilesOfDir` ?
   
   Because line 175: you have to loop again to delete the directories. May be slow on s3

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380415731
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
 
 Review comment:
   maybe you can use flatMap here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   This can be implemented in RDD also

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   This can be implemented in RDD also, if exceed the threshold

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416668
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
 
 Review comment:
   suggest to add a threshold check, use RDD only when `allTmpFiles.length` exceed threshold

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416741
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
 
 Review comment:
   make it private

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416759
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
+      retryTimes -= 1
+    }
+  }
+
+  /**
+   * delete the file
+   */
+  def deleteFile(filePath: String): Boolean = {
 
 Review comment:
   make it private

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416855
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD {
     mergeIndexSize
   }
 
+  /**
+   * delete the file with retry
+   */
+  def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = {
+    var retryTimes = _retryTimes
+    while (!deleteFile(filePath) && retryTimes > 0) {
+      retryTimes -= 1
+    }
+  }
+
+  /**
+   * delete the file
+   */
+  def deleteFile(filePath: String): Boolean = {
+    val success = FileFactory.deleteFile(filePath)
+    if (!success && FileFactory.isFileExist(filePath)) {
 
 Review comment:
   only `if (!success)` is not enough?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525176
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/"
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525290
 
 

 ##########
 File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 ##########
 @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      // get all files in all folder of index files
+      val allTmpFiles = partitionInfo
+        .asScala
+        .map { partitionPath =>
+          FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)
+            .listFiles().toList
+        }.toList.flatten.map(_.getAbsolutePath)
+      // delete files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        deleteFileWithRetry(_, 3)
+      }.collect()
+      // delete dirs
       partitionInfo
         .asScala
         .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+          FileFactory.deleteFile(partitionPath + "/" + tempFolderPath)
 
 Review comment:
   Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services