marchpure opened a new pull request #3976: URL: https://github.com/apache/carbondata/pull/3976 ### Why is this PR needed? A few code of Inserting/Loading/InsertStage/IndexServer won't shutdown executorservice. leads to thread leakage which will degrade the performance of the driver and executor. ### What changes were proposed in this PR? Shutdown executorservices as soon as finish using them. ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - No ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-706766379 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2606/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-706767341 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4356/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#discussion_r503000769 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java ########## @@ -65,8 +65,15 @@ collectStageFiles(table, hadoopConf, stageInputFiles, successFiles); if (stageInputFiles.size() > 0) { int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - return createInputSplits(executorService, stageInputFiles); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(numThreads); + return createInputSplits(executorService, stageInputFiles); + } finally { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdownNow(); + } + } Review comment: executorService has already been shut down, and then pass it in createInputSplits, is that okay? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#discussion_r503000913 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java ########## @@ -65,8 +65,15 @@ collectStageFiles(table, hadoopConf, stageInputFiles, successFiles); if (stageInputFiles.size() > 0) { int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - return createInputSplits(executorService, stageInputFiles); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(numThreads); + return createInputSplits(executorService, stageInputFiles); + } finally { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdownNow(); + } + } Review comment: executorService has already been shut down, and then pass it in createInputSplits, is that okay? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-706813764 LGTM ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Kejian-Li commented on a change in pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#discussion_r503000913 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java ########## @@ -65,8 +65,15 @@ collectStageFiles(table, hadoopConf, stageInputFiles, successFiles); if (stageInputFiles.size() > 0) { int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - return createInputSplits(executorService, stageInputFiles); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(numThreads); + return createInputSplits(executorService, stageInputFiles); + } finally { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdownNow(); + } + } Review comment: executorService has already been shut down, and then pass it in createInputSplits, is that okay? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#discussion_r503000562 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala ########## @@ -123,6 +123,9 @@ object IndexServer extends ServerInterface { t } }) + indexServerExecutorService.get.shutdown() Review comment: It will not accept more tasks. ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -158,20 +158,27 @@ object CarbonMergeFilesRDD { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + var executorService: ExecutorService = null + try { + executorService = Executors.newFixedThreadPool(numThreads) Review comment: move line 163 to line 161 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java ########## @@ -65,8 +65,15 @@ collectStageFiles(table, hadoopConf, stageInputFiles, successFiles); if (stageInputFiles.size() > 0) { int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - return createInputSplits(executorService, stageInputFiles); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(numThreads); Review comment: move line 70 to line 68 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-706847834 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4361/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-706849697 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2611/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#discussion_r503031223 ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java ########## @@ -65,8 +65,15 @@ collectStageFiles(table, hadoopConf, stageInputFiles, successFiles); if (stageInputFiles.size() > 0) { int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10); - ExecutorService executorService = Executors.newFixedThreadPool(numThreads); - return createInputSplits(executorService, stageInputFiles); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(numThreads); Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -158,20 +158,27 @@ object CarbonMergeFilesRDD { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + var executorService: ExecutorService = null + try { + executorService = Executors.newFixedThreadPool(numThreads) Review comment: I have modified code according to your suggestion ########## File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala ########## @@ -123,6 +123,9 @@ object IndexServer extends ServerInterface { t } }) + indexServerExecutorService.get.shutdown() Review comment: I have modified code according to your suggestion ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on pull request #3976: URL: https://github.com/apache/carbondata/pull/3976#issuecomment-707075552 LGTM ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
asfgit closed pull request #3976: URL: https://github.com/apache/carbondata/pull/3976 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |