marchpure opened a new pull request #3842: URL: https://github.com/apache/carbondata/pull/3842 ### Why is this PR needed? Currently, Cleaning temp index files in the mergeindex flow takes a lot of time, sometimes it will take 2~3 mins, which should be optimized ### What changes were proposed in this PR? Clean temp index files in parallel in merge index flow ### Does this PR introduce any user interface change? - No ### Is any new testcase added? - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658033201 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3384/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658033328 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1643/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658036209 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#discussion_r454233869 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,21 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + val allTmpDirs = partitionInfo + .asScala.map { partitionPath => + partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath } - .map(_.get()) + val allTmpFiles = allTmpDirs.map { partitionDir => + FileFactory.getCarbonFile(partitionDir).listFiles() + }.flatten.map(_.getAbsolutePath) + // delete tmp files in parallel + sparkSession.sparkContext.parallelize(allTmpFiles).map { + FileFactory.getCarbonFile(_).delete() + }.collect() + // delete tmp dir in parallel + sparkSession.sparkContext.parallelize(allTmpDirs).map { + FileFactory.getCarbonFile(_).delete() + }.collect() Review comment: You can just use job to replace multi-threads, one task delete one dir with files. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658101120 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3387/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-658101946 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1646/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#discussion_r456206506 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,21 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + val allTmpDirs = partitionInfo + .asScala.map { partitionPath => + partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath } - .map(_.get()) + val allTmpFiles = allTmpDirs.map { partitionDir => + FileFactory.getCarbonFile(partitionDir).listFiles() Review comment: if loading create too many partitions, this list file also take a long time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#discussion_r456206506 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,21 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + val allTmpDirs = partitionInfo + .asScala.map { partitionPath => + partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath } - .map(_.get()) + val allTmpFiles = allTmpDirs.map { partitionDir => + FileFactory.getCarbonFile(partitionDir).listFiles() Review comment: if loading create too many partitions, this listFiles will take a long time also. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
VenuReddy2103 commented on a change in pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#discussion_r457648464 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,21 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + val allTmpDirs = partitionInfo + .asScala.map { partitionPath => + partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath } - .map(_.get()) + val allTmpFiles = allTmpDirs.map { partitionDir => + FileFactory.getCarbonFile(partitionDir).listFiles() Review comment: Instead, how about remove these .carbonindex files and .tmp directory in `CarbonMergeFilesRDD.internalCompute` itself upon successful generation of particular carbonindexmerge ? That would make this cleanup distributed to partition level tasks. Also we would have made `listFiles()` before reading the index files to generate carbonindexmerge file. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#discussion_r457795146 ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -157,21 +157,21 @@ object CarbonMergeFilesRDD { if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) { // remove all tmp folder of index files val startDelete = System.currentTimeMillis() - val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10) - val executorService = Executors.newFixedThreadPool(numThreads) - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - partitionInfo - .asScala - .map { partitionPath => - executorService.submit(new Runnable { - override def run(): Unit = { - ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) - FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(partitionPath + "/" + tempFolderPath)) - } - }) + val allTmpDirs = partitionInfo + .asScala.map { partitionPath => + partitionPath + CarbonCommonConstants.FILE_SEPARATOR + tempFolderPath } - .map(_.get()) + val allTmpFiles = allTmpDirs.map { partitionDir => + FileFactory.getCarbonFile(partitionDir).listFiles() Review comment: if CarbonMergeFilesRDD.internalCompute removes them, the task can't retry successfully after task failure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3842: URL: https://github.com/apache/carbondata/pull/3842#issuecomment-704121200 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4311/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |