[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...

classic Classic list List threaded Threaded
72 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...

qiuchenjian-2
GitHub user kumarvishal09 opened a pull request:

    https://github.com/apache/carbondata/pull/1728

    [CARBONDATA-1926] Expression support inside aggregate function for Query

    PR to support transforming of query plan for aggregate table when query aggregate function contains any expression
   
     - [ ] Any interfaces changed?
     NA
     - [ ] Any backward compatibility impacted?
     NA
     - [ ] Document update required?
    NA
     - [ ] Testing done
         Added UT to validate pre aggregate table selection and Data validation
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kumarvishal09/incubator-carbondata ExpressionSupportInQuery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1728.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1728
   
----
commit 361790641779d881e21f54d7c0f78c19fcf3490e
Author: kumarvishal <kumarvishal.1802@...>
Date:   2017-12-20T10:16:02Z

    Added code to support expression inside aggregate function

commit 3344f0fd3dd5cca31e26d2f1909fce600e8dd8e8
Author: kumarvishal <kumarvishal.1802@...>
Date:   2017-12-25T09:34:39Z

    Added code to support expression inside aggregateExpression in query

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2364/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1152/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][Pre-Aggregate] Expression support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2578/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934173
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    +  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
    --- End diff --
   
    add comment for this function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934260
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    --- End diff --
   
    add a testcase for subquery also


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][Pre-Aggregate] Expression s...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r158934363
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -126,16 +127,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                 aggregateExp,
                 carbonTable,
                 tableName,
    -            list)
    -          carbonTable
    +            list,
    +            aggregateExpressions)
    +          (carbonTable, logicalRelation)
     
             // below case for handling filter query
             // When plan has grouping expression, aggregate expression
             // filter expression
             case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    +        aggregateExp,
    --- End diff --
   
    unnecessary change


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2421/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1197/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2612/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2426/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1202/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2625/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159051463
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -62,6 +62,12 @@
        */
       private int ordinal = Integer.MAX_VALUE;
     
    +  /**
    --- End diff --
   
    Remove unnecessary attributes like parentColumnToAggregationsMapping


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159052141
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    +      // to handle filter expression
    +      case filter@Filter(filterExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        // getting the columns from filter expression
    +        if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
    +          isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
    +        }
    +        filter
    +        // to handle aggregate expression
    +      case agg@Aggregate(groupingExp,
    +      aggregateExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        isValidPlan = extractQueryColumnsFromAggExpression(
    +          groupingExp,
    +          aggregateExp,
    +          pTable,
    +          list,
    +          qAggExprs)
    +        agg
    +        // to handle aggregate expression with filter
    +      case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
    +        val out = validatePlanAndGetFields(filter, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
    +        }
    +        agg
    +        // to handle projection with order by
    +      case proj@Project(projectList, sort@Sort(_, _, _)) =>
    +        val out = validatePlanAndGetFields(sort, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +        // to handle only projection
    +      case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +      // case for handling aggregation with order by when only projection column exits
    +      case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++
    +          extractQueryColumnForOrderBy(None, sortOrders, pTable)
    +        }
    +        sort
    +    }
    +    (isValidPlan, pTable, qLRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
    +      }
    +    }
    +    val selectedDataMapSchema = selectedDataMap.collect {
    +      case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
    +        dataMap
    +    }
    +    selectedDataMapSchema
    +  }
    +
    +  /**
    +   * Below method will be used to update the logical plan of expression
    +   * with parent table logical relation
    +   * @param logicalPlan
    +   * @param logicalRelation
    +   * @return
    +   */
    +  def updateLogicalRelation(logicalPlan: LogicalPlan,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan transform {
    +      case l: LogicalRelation =>
    +        l.copy(relation = logicalRelation.relation)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to to get the logical plan for each aggregate expression in
    +   * child data map and its column schema mapping if mapping is already present
    +   * then it will use the same otherwise it will generate and stored in aggregation data map
    +   * @param selectedDataMap
    +   *                        child data map
    +   * @param carbonTable
    +   *                    parent table
    +   * @param logicalRelation
    +   *                        logical relation of actual plan
    +   * @return map of logical plan for each aggregate expression in child query and its column mapping
    +   */
    +  def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
    +    val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
    +    // if column mapping is not present
    +    if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
    +      // add preAGG UDF to avoid all the PreAggregate rule
    +      val childDataMapQueryString = new CarbonSpark2SqlParser()
    +        .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
    +      // get the logical plan
    +      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
    +      // getting all aggregate expression from query
    +      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
    +      // in case of average child table will have two columns which will be stored in sequence
    +      // so for average expression we need to get two columns for mapping
    +      var counter = 0
    +      // sorting the columns based on schema ordinal so search will give proper result
    +      val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
    +        .sortBy(_.getSchemaOrdinal)
    +      val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
    +        // for each aggregate expression get logical plan
    +        val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
    +          carbonTable.getTableName,
    +          carbonTable.getDatabaseName, logicalRelation)
    +        // check if aggregate expression is of type avg
    +        // get the columns
    +        var columnSchema = aggDataMapSchema
    +          .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
    +        // increment the counter so when for next expression above code will be
    +        // executed it will search from that schema ordinal
    +        counter = columnSchema.getSchemaOrdinal + 1
    +        (expLogicalPlan, columnSchema)
    +      }.toMap
    +      // store the mapping in data map
    +      aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
    +      // return the mapping
    +      logicalPlanToColumnMapping
    +    } else {
    +      // if already present in data map then return the same
    +      aggDataMapSchema.getAggregateExpressionToColumnMapping
    +        .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
    +    }
    +  }
    +
    +
    +  /**
    +   * Below method will be used to get the logical plan from aggregate expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param tableName
    +   *                  parent table name
    +   * @param databaseName
    +   *                     database name
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @return logical plan
    +   */
    +  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
    +      tableName: String,
    +      databaseName: String,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
    +    // be applied
    +    val query = new CarbonSpark2SqlParser()
    +      .addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName")
    +    // updating the logical relation of logical plan to so when two logical plan
    +    // will be compared it will not consider relation
    +    updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to get aggregate expression
    +   * @param logicalPlan
    +   *               logical plan
    +   * @return list of aggregate expression
    +   */
    +  def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = {
    +    val list = scala.collection.mutable.HashSet.empty[AggregateExpression]
    --- End diff --
   
    Should use List or LinkedHasSet as the order of insertion is required for mapping


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159053493
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    +      // to handle filter expression
    +      case filter@Filter(filterExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        // getting the columns from filter expression
    +        if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
    +          isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable)
    +        }
    +        filter
    +        // to handle aggregate expression
    +      case agg@Aggregate(groupingExp,
    +      aggregateExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical relation is
    +        // is for carbon
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        isValidPlan = extractQueryColumnsFromAggExpression(
    +          groupingExp,
    +          aggregateExp,
    +          pTable,
    +          list,
    +          qAggExprs)
    +        agg
    +        // to handle aggregate expression with filter
    +      case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
    +        val out = validatePlanAndGetFields(filter, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs)
    +        }
    +        agg
    +        // to handle projection with order by
    +      case proj@Project(projectList, sort@Sort(_, _, _)) =>
    +        val out = validatePlanAndGetFields(sort, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +        // to handle only projection
    +      case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable)
    +        }
    +        proj
    +      // case for handling aggregation with order by when only projection column exits
    +      case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++
    +          extractQueryColumnForOrderBy(None, sortOrders, pTable)
    +        }
    +        sort
    +    }
    +    (isValidPlan, pTable, qLRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
    +      }
    +    }
    +    val selectedDataMapSchema = selectedDataMap.collect {
    +      case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
    +        dataMap
    +    }
    +    selectedDataMapSchema
    +  }
    +
    +  /**
    +   * Below method will be used to update the logical plan of expression
    +   * with parent table logical relation
    +   * @param logicalPlan
    +   * @param logicalRelation
    +   * @return
    +   */
    +  def updateLogicalRelation(logicalPlan: LogicalPlan,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan transform {
    +      case l: LogicalRelation =>
    +        l.copy(relation = logicalRelation.relation)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to to get the logical plan for each aggregate expression in
    +   * child data map and its column schema mapping if mapping is already present
    +   * then it will use the same otherwise it will generate and stored in aggregation data map
    +   * @param selectedDataMap
    +   *                        child data map
    +   * @param carbonTable
    +   *                    parent table
    +   * @param logicalRelation
    +   *                        logical relation of actual plan
    +   * @return map of logical plan for each aggregate expression in child query and its column mapping
    +   */
    +  def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
    +    val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
    +    // if column mapping is not present
    +    if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
    +      // add preAGG UDF to avoid all the PreAggregate rule
    +      val childDataMapQueryString = new CarbonSpark2SqlParser()
    +        .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
    +      // get the logical plan
    +      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
    +      // getting all aggregate expression from query
    +      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
    +      // in case of average child table will have two columns which will be stored in sequence
    +      // so for average expression we need to get two columns for mapping
    +      var counter = 0
    +      // sorting the columns based on schema ordinal so search will give proper result
    +      val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
    +        .sortBy(_.getSchemaOrdinal)
    +      val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
    +        // for each aggregate expression get logical plan
    +        val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
    +          carbonTable.getTableName,
    +          carbonTable.getDatabaseName, logicalRelation)
    +        // check if aggregate expression is of type avg
    +        // get the columns
    +        var columnSchema = aggDataMapSchema
    +          .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
    +        // increment the counter so when for next expression above code will be
    +        // executed it will search from that schema ordinal
    +        counter = columnSchema.getSchemaOrdinal + 1
    +        (expLogicalPlan, columnSchema)
    +      }.toMap
    +      // store the mapping in data map
    +      aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
    +      // return the mapping
    +      logicalPlanToColumnMapping
    +    } else {
    +      // if already present in data map then return the same
    +      aggDataMapSchema.getAggregateExpressionToColumnMapping
    +        .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap
    +    }
    +  }
    +
    +
    +  /**
    +   * Below method will be used to get the logical plan from aggregate expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param tableName
    +   *                  parent table name
    +   * @param databaseName
    +   *                     database name
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @return logical plan
    +   */
    +  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
    +      tableName: String,
    +      databaseName: String,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
    +    // be applied
    +    val query = new CarbonSpark2SqlParser()
    --- End diff --
   
    Don't create parser every time, pass from caller


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159056348
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    --- End diff --
   
    It seems Join condition of two plans is not handled here. Please handle Join, Union cases also using recursion.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159057341
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -282,6 +136,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             val listFilterColumn = list
               .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
               .toList
    +        val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
             // getting all the aggregation columns
             val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    --- End diff --
   
    This code is unused now , please remove all code related to it.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058055
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1124,92 +1328,57 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        *
        * @param carbonTable
        * parent table
    -   * @param aggFunctions
    -   * aggregation function
    -   * @param tableName
    -   * parent table name
    +   * @param aggExp
    +   * aggregate expression
        * @return list of fields
        */
       def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
    -      aggFunctions: AggregateFunction,
    -      tableName: String
    -  ): Seq[QueryColumn] = {
    +      aggExp: AggregateExpression): Seq[AggregateExpression] = {
         val changedDataType = true
    --- End diff --
   
    Remove unused variable


---
1234