[GitHub] carbondata pull request #2083: [WIP]Pre-Aggregate Streaming table handling

classic Classic list List threaded Threaded
76 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177145374
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -349,10 +442,29 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                       carbonTable,
                       agg)
                   isPlanUpdated = true
    -              Aggregate(updatedGroupExp,
    +              val updateAggPlan = Aggregate(updatedGroupExp,
                     updatedAggExp,
                     CarbonReflectionUtils
                       .getSubqueryAlias(sparkSession, Some(alias), newChild, None))
    +              if(carbonTable.isStreamingTable) {
    --- End diff --
   
    space after if


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177145511
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -405,11 +517,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                       carbonTable,
                       agg)
                   isPlanUpdated = true
    -              Aggregate(updatedGroupExp,
    +              val updateAggPlan = Aggregate(updatedGroupExp,
                     updatedAggExp,
                     Filter(updatedFilterExpression.get,
                       CarbonReflectionUtils
                         .getSubqueryAlias(sparkSession, Some(alias), newChild, None)))
    +              if(carbonTable.isStreamingTable) {
    --- End diff --
   
    These code is duplicated 3 times


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177145680
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -466,7 +594,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                       carbonTable,
                       agg)
                   isPlanUpdated = true
    -              Aggregate(updatedGroupExp,
    +              val updateAggPlan = Aggregate(updatedGroupExp,
    --- End diff --
   
    move `updatedGroupExp` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3381/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4104/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4606/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177619092
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
    @@ -50,9 +51,11 @@ public String getSegmentFileName() {
       }
     
       public static List<Segment> toSegmentList(String[] segmentIds) {
    +    int startIndex =
    --- End diff --
   
    Don't change this method, better add new method


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177619510
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
    @@ -50,9 +51,11 @@ public String getSegmentFileName() {
       }
     
       public static List<Segment> toSegmentList(String[] segmentIds) {
    +    int startIndex =
    +        segmentIds[0].equals(CarbonCommonConstants.PREAGGQUERY_SEGMENTS_CONSTANTS) ? 1 : 0;
    --- End diff --
   
    Can you describe why this check required


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177619930
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala ---
    @@ -0,0 +1,79 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.CarbonDatasourceHadoopRelation
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Union}
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.{BeforeAndAfterAll, Ignore}
    +
    +
    +@Ignore
    +class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll {
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(CASE WHEN age=35 THEN id ELSE 0 END) from mainTable group by name")
    +  }
    +
    +  test("Test Streaming table plan with only projection column") {
    +    val df = sql("select name from maintable group by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  test("Test Streaming table plan with only projection column and order by") {
    +    val df = sql("select name from maintable group by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  test("Test Streaming table plan with sum aggregator") {
    +    val df = sql("select name, sum(age) from maintable group by name order by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  test("Test Streaming table plan with sum aggregator and order by") {
    +    val df = sql("select name, sum(age) from maintable group by name order by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  test("Test Streaming table plan with avg aggregator") {
    +    val df = sql("select name, avg(age) from maintable group by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  test("Test Streaming table plan with expression ") {
    +    val df = sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from maintable group by name order by name")
    +    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
    +  }
    +
    +  /**
    +   * Below method will be used validate whether plan is already updated in case of streaming table
    +   * In case of streaming table it will add UnionNode to get the data from fact and aggregate both
    +   * as aggregate table will be updated after each handoff.
    +   * So if plan is already updated no need to transform the plan again
    +   * @param logicalPlan
    +   * query plan
    +   * @return whether need to update the query plan or not
    +   */
    +  def validateStreamingTablePlan(logicalPlan: LogicalPlan) : Boolean = {
    +    var isChildTableExists: Boolean = false
    +    logicalPlan.transform {
    +      case union @ Union(Seq(plan1, plan2)) =>
    +        plan2.collect{
    +          case logicalRelation: LogicalRelation if
    +          logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +          logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +            .isChildDataMap =>
    +            isChildTableExists = false
    --- End diff --
   
    I think it should be true


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177683347
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -262,12 +351,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         var isPlanUpdated = false
         val updatedPlan = logicalPlan.transform {
           case agg@Aggregate(
    -        grExp,
    -        aggExp,
    -        CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation)))
    +      grExp,
    +      aggExp,
    +      CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation)))
    --- End diff --
   
    Format it properly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177685409
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -493,6 +629,210 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         updatedPlan
       }
     
    +  /**
    +   * Method to get the aggregate query plan
    +   * @param aggPlan
    +   * aggregate table query plan
    +   * @param grExp
    +   * fact group by expression
    +   * @param aggExp
    +   * fact aggregate expression
    +   * @param carbonTable
    +   * fact table
    +   * @param aggregationDataMapSchema
    +   * selected aggregation data map
    +   * @param factAggPlan
    +   * fact aggregate query plan
    +   * @return updated plan
    +   */
    +  def getAggregateQueryPlan(aggPlan: LogicalPlan,
    +      grExp: Seq[Expression],
    +      aggExp: Seq[NamedExpression],
    +      carbonTable: CarbonTable,
    +      aggregationDataMapSchema: DataMapSchema,
    +      factAggPlan: LogicalPlan): LogicalPlan = {
    +    // to handle streaming table with pre aggregate
    +    if (carbonTable.isStreamingTable) {
    +      setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
    --- End diff --
   
    There is a possibility of losing the data from the fact if you set only segments for fact and read the streaming segments directly from fact streaming segments, in case of handoff streaming segments would be marked for delete and those won't be accessible during fact read. So set all the current fact segments also here along with aggregate segments. During fact read,compare the current segments and set segments and read all intersected segments


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #2083: [CARBONDATA-2269]Support Query On PreAggregat...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2083#discussion_r177979593
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -493,6 +629,210 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         updatedPlan
       }
     
    +  /**
    +   * Method to get the aggregate query plan
    +   * @param aggPlan
    +   * aggregate table query plan
    +   * @param grExp
    +   * fact group by expression
    +   * @param aggExp
    +   * fact aggregate expression
    +   * @param carbonTable
    +   * fact table
    +   * @param aggregationDataMapSchema
    +   * selected aggregation data map
    +   * @param factAggPlan
    +   * fact aggregate query plan
    +   * @return updated plan
    +   */
    +  def getAggregateQueryPlan(aggPlan: LogicalPlan,
    +      grExp: Seq[Expression],
    +      aggExp: Seq[NamedExpression],
    +      carbonTable: CarbonTable,
    +      aggregationDataMapSchema: DataMapSchema,
    +      factAggPlan: LogicalPlan): LogicalPlan = {
    +    // to handle streaming table with pre aggregate
    +    if (carbonTable.isStreamingTable) {
    +      setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3426/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4653/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4163/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3430/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4657/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4165/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #2083: [CARBONDATA-2269]Support Query On PreAggregate table...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2083
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4676/



---
1234