niuge01 opened a new pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578 ### Why is this PR needed? When there are a lots of stage files in the stage directory, if load all of them in once time, the loading time will can not be control. There need a way for users to specify the number of stage files per processing, to control the execution time of commands. ### What changes were proposed in this PR? Add a load option batch_file_count for users to specify the number of stage files per processing. ### Does this PR introduce any user interface change? - Yes ### Is any new testcase added? - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
CarbonDataQA1 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-574063814 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1628/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
Zhangshunyu commented on a change in pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#discussion_r366661610 ########## File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala ########## @@ -103,14 +103,91 @@ class TestCarbonWriter extends QueryTest { sql(s"INSERT INTO $tableName STAGE") - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) // ensure the stage snapshot file and all stage files are deleted assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath))) assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty) } finally { - sql(s"drop table if exists $tableName").collect() + sql(s"DROP TABLE IF EXISTS $tableName").collect() + new File(dataPath).delete() + } + } + + @Test + def testBatchLoad(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName").collect() + sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata + """.stripMargin + ).collect() + + val rootPath = System.getProperty("user.dir") + "/target/test-classes" + + val dataTempPath = rootPath + "/data/temp/" + val dataPath = rootPath + "/data/" + new File(dataPath).delete() + new File(dataPath).mkdir() + + try { + val tablePath = storeLocation + "/" + tableName + "/" + + val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation) + val carbonProperties = newCarbonProperties(storeLocation) + + val environment = StreamExecutionEnvironment.getExecutionEnvironment + environment.setParallelism(1) + environment.setRestartStrategy(RestartStrategies.noRestart) + + val dataCount = 1000 + val source = new TestSource(dataCount) { + @throws[InterruptedException] + override def get(index: Int): Array[AnyRef] = { + val data = new Array[AnyRef](3) + data(0) = "test" + index + data(1) = index.asInstanceOf[AnyRef] + data(2) = 12345.asInstanceOf[AnyRef] + data + } + + @throws[InterruptedException] + override def onFinish(): Unit = { + Thread.sleep(5000L) + } + } + val stream = environment.addSource(source) + val factory = CarbonWriterFactory.builder("Local").build( + "default", + tableName, + tablePath, + new Properties, + writerProperties, + carbonProperties + ) + val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build + + stream.addSink(streamSink) + + try environment.execute + catch { + case exception: Exception => + // TODO + throw new UnsupportedOperationException(exception) + } + + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500))) Review comment: why each stage file have 100 lines? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
Zhangshunyu commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-574458409 LGTM ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
jackylk commented on a change in pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#discussion_r366744149 ########## File path: docs/dml-of-carbondata.md ########## @@ -316,12 +316,14 @@ CarbonData DML statements are documented here,which includes: You can use this command to insert them into the table, so that making them visible for query. ``` - INSERT INTO <CARBONDATA TABLE> STAGE + INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, ...) Review comment: please describe all supported options in this document ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
niuge01 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575039406 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575041371 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1657/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
niuge01 commented on a change in pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#discussion_r367297272 ########## File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala ########## @@ -103,14 +103,91 @@ class TestCarbonWriter extends QueryTest { sql(s"INSERT INTO $tableName STAGE") - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) // ensure the stage snapshot file and all stage files are deleted assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath))) assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty) } finally { - sql(s"drop table if exists $tableName").collect() + sql(s"DROP TABLE IF EXISTS $tableName").collect() + new File(dataPath).delete() + } + } + + @Test + def testBatchLoad(): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName").collect() + sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata + """.stripMargin + ).collect() + + val rootPath = System.getProperty("user.dir") + "/target/test-classes" + + val dataTempPath = rootPath + "/data/temp/" + val dataPath = rootPath + "/data/" + new File(dataPath).delete() + new File(dataPath).mkdir() + + try { + val tablePath = storeLocation + "/" + tableName + "/" + + val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation) + val carbonProperties = newCarbonProperties(storeLocation) + + val environment = StreamExecutionEnvironment.getExecutionEnvironment + environment.setParallelism(1) + environment.setRestartStrategy(RestartStrategies.noRestart) + + val dataCount = 1000 + val source = new TestSource(dataCount) { + @throws[InterruptedException] + override def get(index: Int): Array[AnyRef] = { + val data = new Array[AnyRef](3) + data(0) = "test" + index + data(1) = index.asInstanceOf[AnyRef] + data(2) = 12345.asInstanceOf[AnyRef] + data + } + + @throws[InterruptedException] + override def onFinish(): Unit = { + Thread.sleep(5000L) + } + } + val stream = environment.addSource(source) + val factory = CarbonWriterFactory.builder("Local").build( + "default", + tableName, + tablePath, + new Properties, + writerProperties, + carbonProperties + ) + val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build + + stream.addSink(streamSink) + + try environment.execute + catch { + case exception: Exception => + // TODO + throw new UnsupportedOperationException(exception) + } + + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500))) Review comment: Specified by the option CarbonLocalProperty.COMMIT_THRESHOLD. writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
niuge01 commented on a change in pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#discussion_r367297390 ########## File path: docs/dml-of-carbondata.md ########## @@ -316,12 +316,14 @@ CarbonData DML statements are documented here,which includes: You can use this command to insert them into the table, so that making them visible for query. ``` - INSERT INTO <CARBONDATA TABLE> STAGE + INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, ...) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
niuge01 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575421320 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575438429 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1671/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
niuge01 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575605130 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
CarbonDataQA1 commented on issue #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#issuecomment-575631025 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1680/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
In reply to this post by GitBox
asfgit closed pull request #3578: [CARBONDATA-3663] Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] With regards, Apache Git Services |
Free forum by Nabble | Edit this page |