marchpure opened a new pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622 ### Why is this PR needed? Now, Cleaning temp index files in the mergeindex flow takes a lot of time, sometimes it will take 2~3 mins, which should be optimized ### What changes were proposed in this PR? Clean temp index files in parallel in merge index flow ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - No ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586551417 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/299/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586554100 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2003/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
marchpure commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586568016 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586569602 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/306/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#issuecomment-586575134 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2009/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r379984005 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD { mergeIndexSize } + /** + * delete the file with retry + */ + def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = { + var retryTimes = _retryTimes + while (!deleteFile(filePath) && retryTimes > 0) { Review comment: why not use `deleteAllCarbonFilesOfDir` ? Because line 175: you have to loop again to delete the directories. May be slow on s3 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380415731 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => Review comment: maybe you can use flatMap here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => + FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath) + .listFiles().toList + }.toList.flatten.map(_.getAbsolutePath) + // delete files in parallel + sparkSession.sparkContext.parallelize(allTmpFiles).map { + deleteFileWithRetry(_, 3) + }.collect() + // delete dirs partitionInfo .asScala .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + FileFactory.deleteFile(partitionPath + "/" + tempFolderPath) Review comment: This can be implemented in RDD also ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416336 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => + FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath) + .listFiles().toList + }.toList.flatten.map(_.getAbsolutePath) + // delete files in parallel + sparkSession.sparkContext.parallelize(allTmpFiles).map { + deleteFileWithRetry(_, 3) + }.collect() + // delete dirs partitionInfo .asScala .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + FileFactory.deleteFile(partitionPath + "/" + tempFolderPath) Review comment: This can be implemented in RDD also, if exceed the threshold ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416668 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => + FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath) + .listFiles().toList + }.toList.flatten.map(_.getAbsolutePath) + // delete files in parallel + sparkSession.sparkContext.parallelize(allTmpFiles).map { Review comment: suggest to add a threshold check, use RDD only when `allTmpFiles.length` exceed threshold ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416741 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD { mergeIndexSize } + /** + * delete the file with retry + */ + def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = { Review comment: make it private ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416759 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD { mergeIndexSize } + /** + * delete the file with retry + */ + def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = { + var retryTimes = _retryTimes + while (!deleteFile(filePath) && retryTimes > 0) { + retryTimes -= 1 + } + } + + /** + * delete the file + */ + def deleteFile(filePath: String): Boolean = { Review comment: make it private ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r380416855 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -191,6 +193,27 @@ object CarbonMergeFilesRDD { mergeIndexSize } + /** + * delete the file with retry + */ + def deleteFileWithRetry(filePath: String, _retryTimes: Integer): Unit = { + var retryTimes = _retryTimes + while (!deleteFile(filePath) && retryTimes > 0) { + retryTimes -= 1 + } + } + + /** + * delete the file + */ + def deleteFile(filePath: String): Boolean = { + val success = FileFactory.deleteFile(filePath) + if (!success && FileFactory.isFileExist(filePath)) { Review comment: only `if (!success)` is not enough? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525176 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => + FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath) Review comment: Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3622: [CARBONDATA-3702] Clean temp index files in parallel in merge index flow
URL: https://github.com/apache/carbondata/pull/3622#discussion_r382525290 ########## File path: integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,23 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + // get all files in all folder of index files + val allTmpFiles = partitionInfo + .asScala + .map { partitionPath => + FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath) + .listFiles().toList + }.toList.flatten.map(_.getAbsolutePath) + // delete files in parallel + sparkSession.sparkContext.parallelize(allTmpFiles).map { + deleteFileWithRetry(_, 3) + }.collect() + // delete dirs partitionInfo .asScala .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + FileFactory.deleteFile(partitionPath + "/" + tempFolderPath) Review comment: Can use CarbonCommonConstants.FILE_SEPARATOR instead of "/" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |