[GitHub] [carbondata] marchpure opened a new pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] marchpure opened a new pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox

marchpure opened a new pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842


    ### Why is this PR needed?
    Currently, Cleaning temp index files in the mergeindex flow takes a lot of time, sometimes it will take 2~3 mins, which should be optimized
   
    ### What changes were proposed in this PR?
    Clean temp index files in parallel in merge index flow
       
    ### Does this PR introduce any user interface change?
    - No
   
    ### Is any new testcase added?
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox

CarbonDataQA1 commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658033201


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3384/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658033328


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1643/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] marchpure commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

marchpure commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658036209


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#discussion_r454233869



##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -157,21 +157,21 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      partitionInfo
-        .asScala
-        .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+      val allTmpDirs = partitionInfo
+        .asScala.map { partitionPath =>
+          partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath
         }
-        .map(_.get())
+      val allTmpFiles = allTmpDirs.map { partitionDir =>
+          FileFactory.getCarbonFile(partitionDir).listFiles()
+        }.flatten.map(_.getAbsolutePath)
+      // delete tmp files in parallel
+      sparkSession.sparkContext.parallelize(allTmpFiles).map {
+        FileFactory.getCarbonFile(_).delete()
+      }.collect()
+      // delete tmp dir in parallel
+      sparkSession.sparkContext.parallelize(allTmpDirs).map {
+        FileFactory.getCarbonFile(_).delete()
+      }.collect()

Review comment:
       You can just use job to replace multi-threads, one task delete one dir with files.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658101120


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3387/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658101946


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1646/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#discussion_r456206506



##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -157,21 +157,21 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      partitionInfo
-        .asScala
-        .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+      val allTmpDirs = partitionInfo
+        .asScala.map { partitionPath =>
+          partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath
         }
-        .map(_.get())
+      val allTmpFiles = allTmpDirs.map { partitionDir =>
+          FileFactory.getCarbonFile(partitionDir).listFiles()

Review comment:
       if loading create too many partitions, this list file also take a long time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#discussion_r456206506



##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -157,21 +157,21 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      partitionInfo
-        .asScala
-        .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+      val allTmpDirs = partitionInfo
+        .asScala.map { partitionPath =>
+          partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath
         }
-        .map(_.get())
+      val allTmpFiles = allTmpDirs.map { partitionDir =>
+          FileFactory.getCarbonFile(partitionDir).listFiles()

Review comment:
       if loading create too many partitions, this listFiles will take a long time also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] VenuReddy2103 commented on a change in pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

VenuReddy2103 commented on a change in pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#discussion_r457648464



##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -157,21 +157,21 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      partitionInfo
-        .asScala
-        .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+      val allTmpDirs = partitionInfo
+        .asScala.map { partitionPath =>
+          partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath
         }
-        .map(_.get())
+      val allTmpFiles = allTmpDirs.map { partitionDir =>
+          FileFactory.getCarbonFile(partitionDir).listFiles()

Review comment:
       Instead, how about remove these .carbonindex files and .tmp directory in `CarbonMergeFilesRDD.internalCompute` itself upon successful generation of particular carbonindexmerge ? That would make this cleanup distributed to partition level tasks. Also we would have made `listFiles()` before reading the index files to generate carbonindexmerge file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#discussion_r457795146



##########
File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -157,21 +157,21 @@ object CarbonMergeFilesRDD {
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
       // remove all tmp folder of index files
       val startDelete = System.currentTimeMillis()
-      val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
-      val executorService = Executors.newFixedThreadPool(numThreads)
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      partitionInfo
-        .asScala
-        .map { partitionPath =>
-          executorService.submit(new Runnable {
-            override def run(): Unit = {
-              ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-              FileFactory.deleteAllCarbonFilesOfDir(
-                FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath))
-            }
-          })
+      val allTmpDirs = partitionInfo
+        .asScala.map { partitionPath =>
+          partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath
         }
-        .map(_.get())
+      val allTmpFiles = allTmpDirs.map { partitionDir =>
+          FileFactory.getCarbonFile(partitionDir).listFiles()

Review comment:
       if CarbonMergeFilesRDD.internalCompute removes them, the task can't retry successfully after task failure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3842: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3842:
URL: https://github.com/apache/carbondata/pull/3842#issuecomment-704121200


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4311/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]