marchpure opened a new pull request #3799: URL: https://github.com/apache/carbondata/pull/3799 ### Why is this PR needed? There are two major performance bottlenecks of 'insert stage'. 1) Get LastModifyTime of stagefiles requires a lot of access to OBS. 2) Parallelism is not supported ### What changes were proposed in this PR? 1) Cache the lastmodifytime info when list stage files. 2) support insert stage in parallel. we add a tag 'loading' to the stages in process. different insertstage processes can load different data separately by choose the stages without 'loading' tag or stages loaded timeout. which avoid loading the same data between concurrent insertstage processes. The 'loading' tag is actually an empty file with '.loading' suffix filename. ### Does this PR introduce any user interface change? NO ### Is any new testcase added? YES ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647196597 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3186/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647197331 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1460/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
niuge01 commented on a change in pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#discussion_r443412925 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand( output.asScala } + /** + * create '.loading' file to tag the stage in process + * Return false means the stage files were creat successfully + * While return true means the stage files were failed to create + */ + private def createStageLoadingFiles( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { + stageFiles.map { files => + executorService.submit(new Callable[Boolean] { + override def call(): Boolean = { + val stageLoadingFile = + FileFactory.getCarbonFile(files._1.getAbsolutePath + + CarbonTablePath.LOADING_FILE_SUBFIX); + if (!stageLoadingFile.exists()) { + stageLoadingFile.createNewFile(); + } else { + stageLoadingFile.setLastModifiedTime(System.currentTimeMillis()); + } + } + }) + }.filter { future => + future.get() + } + stageFiles + } + + /** + * create '.loading' file with retry + */ + private def createStageLoadingFilesWithRetry( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { + val startTime = System.currentTimeMillis() + var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES + while (createStageLoadingFiles(executorService, stageFiles).length > 0 && retry > 0) { Review comment: Please check this loop condition, if createStageLoadingFiles(executorService, stageFiles).length > 0, should loop continue? ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() { } } + /** + * Validate and get the input metrics interval + * + * @return input metrics interval + */ + public static Long getInsertStageTimeout() { + String timeout = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT); + if (timeout == null) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + try { + long configuredValue = Long.parseLong(timeout); + if (configuredValue < 0) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; Review comment: Log a warning for illegal configuration value ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() { } } + /** + * Validate and get the input metrics interval + * + * @return input metrics interval + */ + public static Long getInsertStageTimeout() { + String timeout = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT); + if (timeout == null) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + try { + long configuredValue = Long.parseLong(timeout); + if (configuredValue < 0) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + return configuredValue; + } + } catch (Exception ex) { Review comment: Catch NumberFormatException。 Log a warning for exception. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -148,10 +149,19 @@ case class CarbonInsertFromStageCommand( return Seq.empty } - // 2) read all stage files to collect input files for data loading - // create a thread pool to read them + // We add a tag 'loading' to the stages in process. + // different insertstage processes can load different data separately + // by choose the stages without 'loading' tag or stages loaded timeout. + // which avoid loading the same data between concurrent insertstage processes. + // The 'loading' tag is actually an empty file with + // '.loading' suffix filename val numThreads = Math.min(Math.max(stageFiles.length, 1), 10) val executorService = Executors.newFixedThreadPool(numThreads) + createStageLoadingFilesWithRetry(executorService, stageFiles) + lock.unlock() Review comment: remove this line, lock will unlock in finally block. ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -1521,6 +1521,10 @@ private CarbonCommonConstants() { public static final String CARBON_QUERY_STAGE_INPUT_DEFAULT = "false"; + public static final String CARBON_INSERT_STAGE_TIMEOUT = "carbon.insert.stage.timeout"; + + public static final long CARBON_INSERT_STAGE_TIMEOUT_DEFAULT = 28800000; Review comment: add comment here, like // 8 hours. ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand( output.asScala } + /** + * create '.loading' file to tag the stage in process + * Return false means the stage files were creat successfully + * While return true means the stage files were failed to create + */ + private def createStageLoadingFiles( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { + stageFiles.map { files => + executorService.submit(new Callable[Boolean] { + override def call(): Boolean = { + val stageLoadingFile = + FileFactory.getCarbonFile(files._1.getAbsolutePath + + CarbonTablePath.LOADING_FILE_SUBFIX); + if (!stageLoadingFile.exists()) { Review comment: exists and createNewFile not in a same transaction, so createNewFile may be fail. suggest change to: if (!stageLoadingFile.createNewFile()) { stageLoadingFile.setLastModifiedTime(System.currentTimeMillis()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#discussion_r443450793 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() { } } + /** + * Validate and get the input metrics interval + * + * @return input metrics interval + */ + public static Long getInsertStageTimeout() { + String timeout = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT); + if (timeout == null) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + try { + long configuredValue = Long.parseLong(timeout); + if (configuredValue < 0) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + return configuredValue; + } + } catch (Exception ex) { Review comment: modified ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() { } } + /** + * Validate and get the input metrics interval + * + * @return input metrics interval + */ + public static Long getInsertStageTimeout() { + String timeout = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT); + if (timeout == null) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; + } else { + try { + long configuredValue = Long.parseLong(timeout); + if (configuredValue < 0) { + return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT; Review comment: modified ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#discussion_r443451334 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -148,10 +149,19 @@ case class CarbonInsertFromStageCommand( return Seq.empty } - // 2) read all stage files to collect input files for data loading - // create a thread pool to read them + // We add a tag 'loading' to the stages in process. + // different insertstage processes can load different data separately + // by choose the stages without 'loading' tag or stages loaded timeout. + // which avoid loading the same data between concurrent insertstage processes. + // The 'loading' tag is actually an empty file with + // '.loading' suffix filename val numThreads = Math.min(Math.max(stageFiles.length, 1), 10) val executorService = Executors.newFixedThreadPool(numThreads) + createStageLoadingFilesWithRetry(executorService, stageFiles) + lock.unlock() Review comment: it can't be removed, as we aim to release ingest lock once complete tag 'loading' for the choosed stage. ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -1521,6 +1521,10 @@ private CarbonCommonConstants() { public static final String CARBON_QUERY_STAGE_INPUT_DEFAULT = "false"; + public static final String CARBON_INSERT_STAGE_TIMEOUT = "carbon.insert.stage.timeout"; + + public static final long CARBON_INSERT_STAGE_TIMEOUT_DEFAULT = 28800000; Review comment: modified ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#discussion_r443451443 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand( output.asScala } + /** + * create '.loading' file to tag the stage in process + * Return false means the stage files were creat successfully + * While return true means the stage files were failed to create + */ + private def createStageLoadingFiles( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { + stageFiles.map { files => + executorService.submit(new Callable[Boolean] { + override def call(): Boolean = { + val stageLoadingFile = + FileFactory.getCarbonFile(files._1.getAbsolutePath + + CarbonTablePath.LOADING_FILE_SUBFIX); + if (!stageLoadingFile.exists()) { Review comment: modified ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647418441 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1464/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#discussion_r443452016 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ########## @@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand( output.asScala } + /** + * create '.loading' file to tag the stage in process + * Return false means the stage files were creat successfully + * While return true means the stage files were failed to create + */ + private def createStageLoadingFiles( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = { + stageFiles.map { files => + executorService.submit(new Callable[Boolean] { + override def call(): Boolean = { + val stageLoadingFile = + FileFactory.getCarbonFile(files._1.getAbsolutePath + + CarbonTablePath.LOADING_FILE_SUBFIX); + if (!stageLoadingFile.exists()) { + stageLoadingFile.createNewFile(); + } else { + stageLoadingFile.setLastModifiedTime(System.currentTimeMillis()); + } + } + }) + }.filter { future => + future.get() + } + stageFiles + } + + /** + * create '.loading' file with retry + */ + private def createStageLoadingFilesWithRetry( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { + val startTime = System.currentTimeMillis() + var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES + while (createStageLoadingFiles(executorService, stageFiles).length > 0 && retry > 0) { Review comment: checked. it shall loop continue. createStageLoadingFiles(executorService, stageFiles).length is equal to the stages fails to tag 'loading'. if length >0, we shall loop continue and retry to tag 'loading' again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647418962 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3190/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647491662 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3191/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647492250 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1465/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647923116 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3194/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-647934951 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1468/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-648234864 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3196/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-648235972 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1470/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-648576741 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/3198/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-648577186 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/1472/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
niuge01 commented on pull request #3799: URL: https://github.com/apache/carbondata/pull/3799#issuecomment-648622942 LGTM ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
asfgit closed pull request #3799: URL: https://github.com/apache/carbondata/pull/3799 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |