GitHub user kunal642 opened a pull request:
https://github.com/apache/carbondata/pull/2084 [WIP][CARBONDATA-1522] Support preaggregate table creation and loading on streaming tables 1. Added support to create preaggregate datamap on streaming table. 2. Added support to load data into datamap after handoff is fired for streaming table 3. Added transaction support for datamap loading Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [X] Any interfaces changed? - [X] Any backward compatibility impacted? - [X] Document update required? - [X] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kunal642/carbondata streaming_preagg_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2084 ---- commit 8a86c43fa343fbfa4d43dc1d49c242c28f2b6cc8 Author: kunal642 <kunalkapoor642@...> Date: 2018-03-20T06:11:17Z 1. Added support to create preaggregate datamap on streaming table 2. Added support to load data into datamap after handoff is fired for streaming table 3. Added transaction support for datamap loading commit 631dba88ab18f93543325b560e9fcaa311f05dd6 Author: kunal642 <kunalkapoor642@...> Date: 2018-03-20T07:04:08Z refactor commit 8632c2baf3676f60f38691f334891737a229954d Author: kunal642 <kunalkapoor642@...> Date: 2018-03-21T04:57:03Z added test cases ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3232/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4465/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4466/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3233/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2084 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3982/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2084 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3983/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4475/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3243/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2084 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3991/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176075242 --- Diff: examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala --- @@ -21,6 +21,7 @@ import org.apache.flink.api.java.ExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job +import org.apache.carbondata.examples.util.ExampleUtils --- End diff -- why this is required? --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176076016 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -199,9 +199,12 @@ object LoadPostAggregateListener extends OperationEventListener { * @param event */ override def onEvent(event: Event, operationContext: OperationContext): Unit = { - val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] + val carbonLoadModel = + event match { + case e: LoadTablePreStatusUpdateEvent => e.getCarbonLoadModel + case e: LoadTablePostExecutionEvent => e.getCarbonLoadModel + } --- End diff -- add a case for others --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176077283 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -299,8 +302,8 @@ object StreamHandoffRDD { def executeStreamingHandoff( carbonLoadModel: CarbonLoadModel, sparkSession: SparkSession, - handoffSegmenId: String - ): Unit = { + operationContext: OperationContext, --- End diff -- why operationContext is needed --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176649884 --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala --- @@ -299,8 +302,8 @@ object StreamHandoffRDD { def executeStreamingHandoff( carbonLoadModel: CarbonLoadModel, sparkSession: SparkSession, - handoffSegmenId: String - ): Unit = { + operationContext: OperationContext, --- End diff -- operation context has CarbonLoadDataCommands which are needed in PreAggregateListeners. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176651976 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala --- @@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult(exceptedRow)(row) } + test("test preaggregate table creation on streaming table without handoff") { + val identifier = new TableIdentifier("agg_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name") + // No data should be loaded into aggregate table as hand-off is not yet fired + checkAnswer(sql("select * from agg_table_p1"), Seq()) + } + + test("test if data is loaded into preaggregate after handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + sql("drop table agg_table2") + } + + test("test if data is loaded in aggregate table after handoff is done for streaming table") { + createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table3", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table3"), + Seq(Row(10))) + sql("alter table agg_table3 finish streaming") + sql("alter table agg_table3 compact 'streaming'") + sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table3_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + } + --- End diff -- Above 3 test cases will have running time + 21 sec of sleep time. I think this is a huge waiting time and will increase the UT running time considerably. In the complete test file I can find that already we have a sleep time of nearly 30 secs. @QiangCai ...Can we discuss together and try to find a better solution to avoid this delay in test cases execution --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3305/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4533/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2084 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4033/ --- |
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2084#discussion_r176998393 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala --- @@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult(exceptedRow)(row) } + test("test preaggregate table creation on streaming table without handoff") { + val identifier = new TableIdentifier("agg_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name") + // No data should be loaded into aggregate table as hand-off is not yet fired + checkAnswer(sql("select * from agg_table_p1"), Seq()) + } + + test("test if data is loaded into preaggregate after handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + sql("drop table agg_table2") + } + + test("test if data is loaded in aggregate table after handoff is done for streaming table") { + createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table3", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table3"), + Seq(Row(10))) + sql("alter table agg_table3 finish streaming") + sql("alter table agg_table3 compact 'streaming'") + sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table3_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + } + --- End diff -- @manishgupta88 I will try to reduce the run time. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2084 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3388/ --- |
Free forum by Nabble | Edit this page |