[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

classic Classic list List threaded Threaded
58 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
GitHub user kunal642 opened a pull request:

    https://github.com/apache/carbondata/pull/2084

    [WIP][CARBONDATA-1522] Support preaggregate table creation and loading on streaming tables

    1. Added support to create preaggregate datamap on streaming table.
    2. Added support to load data into datamap after handoff is fired for streaming table
    3. Added transaction support for datamap loading
   
   
    Be sure to do all of the following checklist to help us incorporate
    your contribution quickly and easily:
   
     - [X] Any interfaces changed?
     
     - [X] Any backward compatibility impacted?
     
     - [X] Document update required?
   
     - [X] Testing done
            Please provide details on
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kunal642/carbondata streaming_preagg_support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2084
   
----
commit 8a86c43fa343fbfa4d43dc1d49c242c28f2b6cc8
Author: kunal642 <kunalkapoor642@...>
Date:   2018-03-20T06:11:17Z

    1. Added support to create preaggregate datamap on streaming table
    2. Added support to load data into datamap after handoff is fired for streaming table
    3. Added transaction support for datamap loading

commit 631dba88ab18f93543325b560e9fcaa311f05dd6
Author: kunal642 <kunalkapoor642@...>
Date:   2018-03-20T07:04:08Z

    refactor

commit 8632c2baf3676f60f38691f334891737a229954d
Author: kunal642 <kunalkapoor642@...>
Date:   2018-03-21T04:57:03Z

    added test cases

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3232/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4465/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4466/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3233/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3982/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3983/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4475/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3243/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [WIP][CARBONDATA-1522] Support preaggregate table cr...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3991/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176075242
 
    --- Diff: examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala ---
    @@ -21,6 +21,7 @@ import org.apache.flink.api.java.ExecutionEnvironment
     import org.apache.hadoop.conf.Configuration
     import org.apache.hadoop.mapreduce.Job
     
    +import org.apache.carbondata.examples.util.ExampleUtils
    --- End diff --
   
    why this is required?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176076016
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -199,9 +199,12 @@ object LoadPostAggregateListener extends OperationEventListener {
        * @param event
        */
       override def onEvent(event: Event, operationContext: OperationContext): Unit = {
    -    val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
    +    val carbonLoadModel =
    +      event match {
    +        case e: LoadTablePreStatusUpdateEvent => e.getCarbonLoadModel
    +        case e: LoadTablePostExecutionEvent => e.getCarbonLoadModel
    +      }
    --- End diff --
   
    add a case for others


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176077283
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---
    @@ -299,8 +302,8 @@ object StreamHandoffRDD {
       def executeStreamingHandoff(
           carbonLoadModel: CarbonLoadModel,
           sparkSession: SparkSession,
    -      handoffSegmenId: String
    -  ): Unit = {
    +      operationContext: OperationContext,
    --- End diff --
   
    why operationContext is needed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176649884
 
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---
    @@ -299,8 +302,8 @@ object StreamHandoffRDD {
       def executeStreamingHandoff(
           carbonLoadModel: CarbonLoadModel,
           sparkSession: SparkSession,
    -      handoffSegmenId: String
    -  ): Unit = {
    +      operationContext: OperationContext,
    --- End diff --
   
    operation context has CarbonLoadDataCommands which are needed in PreAggregateListeners.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [WIP][CARBONDATA-1522] Support preaggregate t...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176651976
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---
    @@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         assertResult(exceptedRow)(row)
       }
     
    +  test("test preaggregate table creation on streaming table without handoff") {
    +    val identifier = new TableIdentifier("agg_table", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name")
    +    // No data should be loaded into aggregate table as hand-off is not yet fired
    +    checkAnswer(sql("select * from agg_table_p1"), Seq())
    +  }
    +
    +  test("test if data is loaded into preaggregate after handoff is fired") {
    +    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table2"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
    +    sql("alter table agg_table2 finish streaming")
    +    sql("alter table agg_table2 compact 'streaming'")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table2_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +    sql("drop table agg_table2")
    +  }
    +
    +  test("test if data is loaded in aggregate table after handoff is done for streaming table") {
    +    createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table3", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table3"),
    +      Seq(Row(10)))
    +    sql("alter table agg_table3 finish streaming")
    +    sql("alter table agg_table3 compact 'streaming'")
    +    sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table3_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +  }
    +
    --- End diff --
   
    Above 3 test cases will have running time + 21 sec of sleep time. I think this is a huge waiting time and will increase the UT running time considerably.
    In the complete test file I can find that already we have a sleep time of nearly 30 secs.
    @QiangCai ...Can we discuss together and try to find a better solution to avoid this delay in test cases execution


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [CARBONDATA-1522] Support preaggregate table creatio...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3305/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [CARBONDATA-1522] Support preaggregate table creatio...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4533/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [CARBONDATA-1522] Support preaggregate table creatio...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4033/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2084: [CARBONDATA-1522] Support preaggregate table ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2084#discussion_r176998393
 
    --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---
    @@ -249,6 +251,91 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         assertResult(exceptedRow)(row)
       }
     
    +  test("test preaggregate table creation on streaming table without handoff") {
    +    val identifier = new TableIdentifier("agg_table", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name")
    +    // No data should be loaded into aggregate table as hand-off is not yet fired
    +    checkAnswer(sql("select * from agg_table_p1"), Seq())
    +  }
    +
    +  test("test if data is loaded into preaggregate after handoff is fired") {
    +    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table2"),
    +      Seq(Row(10)))
    +    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name")
    +    sql("alter table agg_table2 finish streaming")
    +    sql("alter table agg_table2 compact 'streaming'")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table2_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +    sql("drop table agg_table2")
    +  }
    +
    +  test("test if data is loaded in aggregate table after handoff is done for streaming table") {
    +    createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
    +    val identifier = new TableIdentifier("agg_table3", Option("streaming"))
    +    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
    +      .asInstanceOf[CarbonRelation].metaData.carbonTable
    +    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
    +    // streaming ingest 10 rows
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
    +      identifier)
    +    thread.start()
    +    Thread.sleep(2000)
    +    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
    +    Thread.sleep(5000)
    +    thread.interrupt()
    +    checkAnswer(
    +      sql("select count(*) from streaming.agg_table3"),
    +      Seq(Row(10)))
    +    sql("alter table agg_table3 finish streaming")
    +    sql("alter table agg_table3 compact 'streaming'")
    +    sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name")
    +    // Data should be loaded into aggregate table as hand-off is fired
    +    checkAnswer(sql("select * from agg_table3_p1"),
    +      Seq(
    +        Row("name_10", 200000.0),
    +        Row("name_11", 220000.0),
    +        Row("name_12", 240000.0),
    +        Row("name_13", 260000.0),
    +        Row("name_14", 280000.0)))
    +  }
    +
    --- End diff --
   
    @manishgupta88
    I will try to reduce the run time.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2084: [CARBONDATA-1522] Support preaggregate table creatio...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2084
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3388/



---
123