[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...

classic Classic list List threaded Threaded
72 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058395
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -933,121 +1143,122 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        * child data map schema
        * @param attributes
        * child logical relation
    +   * @param expLogicalPlanToColumnSchemaMapping
    +   * expression logical plan to data map column mapping
    +   * @param parentTable
    +   * parent carbon table
    +   * @param logicalRelation
    +   * logical relation
        * @return updated expression
        */
       def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
    -      dataMapSchema: DataMapSchema,
    -      attributes: Seq[AttributeReference]):
    +      dataMapSchema: AggregationDataMapSchema,
    +      attributes: Seq[AttributeReference],
    +      expLogicalPlanToColumnSchemaMapping: Option[Map[LogicalPlan, ColumnSchema]],
    +      parentTable: CarbonTable,
    +      logicalRelation: LogicalRelation):
       Expression = {
    +    val updatedAggExp = getUpdateAggregateExpressions(aggExp)
    --- End diff --
   
    Add code comment here


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159058689
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -294,12 +149,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
             // select the list of valid child tables
             val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +        // query has only aggregate expression then selected data map will be empty
    +        // the validate all the child data map otherwise validate selected data map
    +        var selectedAggMaps = if (isProjectionColumnPresent) {
    +          selectedDataMapSchemas
    +        } else {
    +          carbonTable.getTableInfo.getDataMapSchemaList
    +        }
    +        val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
    +          getLogicalPlanFromAggExp(queryAggExp,
    +            carbonTable.getTableName,
    +            carbonTable.getDatabaseName, logicalRelation)
    +        }.toSeq
    +        if(aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
    --- End diff --
   
    add comments


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1219/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2451/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159378009
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,105 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    +  test("test pre agg table selection with expression 1") {
    +    val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
    +  }
    +
    +
    +  test("test pre agg table selection with expression 2") {
    +    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
    +  }
    +
    +  test("test pre agg table selection with expression 3") {
    +    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
    +    checkAnswer(df, Seq(Row(6.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 4") {
    +    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
    +    checkAnswer(df, Seq(Row(2.0)))
    +  }
    +
    +  test("test pre agg table selection with expression 5") {
    +    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
    +    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
    +    checkAnswer(df, Seq(Row(2.0,6.0)))
    +  }
    +
    --- End diff --
   
    add TestPreAggregateWithSubQuery


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2508/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1283/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159435962
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -18,28 +18,46 @@
     package org.apache.spark.sql.hive
     
     import scala.collection.JavaConverters._
    -import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable
     
    -import org.apache.spark.SPARK_VERSION
     import org.apache.spark.sql._
    -import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
     import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression}
     import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
    -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
    -import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
    +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
     import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
     import org.apache.spark.sql.types._
    -import org.apache.spark.sql.util.CarbonException
     import org.apache.spark.util.CarbonReflectionUtils
     
     import org.apache.carbondata.core.constants.CarbonCommonConstants
     import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
     import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
     import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
    -import org.apache.carbondata.spark.util.CarbonScalaUtil
     
    +/**
    + * model class to store aggregate expression logical plan
    + * and its column schema mapping
    + * @param logicalPlan
    + * logical plan of aggregate expression
    + * @param columnSchema
    + * column schema from table
    + */
    +case class AggExpToColumnMappingModel(var logicalPlan: LogicalPlan,
    --- End diff --
   
    Move `var logicalPlan: LogicalPlan` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438073
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    --- End diff --
   
    Better do the transform instead of match as in case if Alias comes match cannot work


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438250
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    --- End diff --
   
    Better use `match { case }` instead of if else


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159438754
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    --- End diff --
   
    why you use `attr.qualifier `  intead of `childExp.qualifier`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159440196
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    --- End diff --
   
    Better use match { case } instead of if else


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159440457
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    --- End diff --
   
    not required to assign to variable


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159442284
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    --- End diff --
   
    use transform inside map , then you no need to handle Alias separetly.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159442746
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    --- End diff --
   
    You can directly use `updateExpression` no need of this method. `NamedExpression` is derived from `Expression`. So after updation you can type cast.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443128
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    --- End diff --
   
    Here also you use updateExpression method, no need of this method


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443443
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    --- End diff --
   
    MOve down `grExp` to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159443569
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    --- End diff --
   
    Indentation is wrong


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159457280
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    --- End diff --
   
    childExp.qualifier will not have table alias name in case of join we need to table alias name


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159457864
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -806,67 +824,105 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         val updatedAggExp = aggregateExpressions.map {
    --- End diff --
   
    It seems lot of duplicate code, better do transform and use single  `case attr: AttributeReference` handle all here


---
1234