[GitHub] carbondata pull request #1728: [CARBONDATA-1926] Expression support inside a...

classic Classic list List threaded Threaded
72 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159458812
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    --- End diff --
   
    How about with Sort ?


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159459370
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
    -          // only carbon query plan is supported checking whether logical relation is
    -          // is for carbon
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
    +   * Note: During aggregate table creation for average table will be created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    +        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        var isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        // getting the columns from filter expression
    +        isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnFromFilterExp(expression, list, carbonTable)
    +        }
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
    -          }
    -          carbonTable
    -        case _ =>
    -          isValidPlan = false
    -          null
    -      }
    -      if (isValidPlan && null != carbonTable) {
    -        isValidPlan = isSpecificSegmentPresent(carbonTable)
    -      }
    -      // if plan is valid then update the plan with child attributes
    -      if (isValidPlan) {
    -        // getting all the projection columns
    -        val listProjectionColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the filter columns
    -        val listFilterColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the aggregation columns
    -        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
    -          .toList
    -        // create a query plan object which will be used to select the list of pre aggregate tables
    -        // matches with this plan
    -        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    -          listAggregationColumn.asJava,
    -          listFilterColumn.asJava)
    -        // create aggregate table selector object
    -        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    -        // select the list of valid child tables
    -        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    -        // if it does not match with any pre aggregate table return the same plan
    -        if (!selectedDataMapSchemas.isEmpty) {
    -          // filter the selected child schema based on size to select the pre-aggregate tables
    -          // that are nonEmpty
    -          val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    -          val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    -            val identifier = TableIdentifier(
    -              selectedDataMapSchema.getRelationIdentifier.getTableName,
    -              Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    -            val carbonRelation =
    -              catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    -            val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
    -            (selectedDataMapSchema, carbonRelation, relation)
    -          }.filter(_._2.sizeInBytes != 0L)
    -          if (relationBuffer.isEmpty) {
    -            // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
    -            // tables have date yet.
    -            // In this case we would return the original plan so that the query hits the parent
    -            // table.
    -            plan
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                Some(expression),
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              Filter(updatedFilterExpression.get,
    +                newChild))
               } else {
    -            // If the relationBuffer is nonEmpty then find the table with the minimum size.
    -            val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
    -            val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
    -            // transform the query plan based on selected child schema
    -            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
    +            agg
               }
             } else {
    -          plan
    +          agg
             }
    +    }
    +    updatedPlan
    +  }
    +
    +  /**
    +   * Below method will be used to validate query plan and get the proper aggregation data map schema
    +   * and child relation plan object if plan is valid for transformation
    +   * @param queryColumns
    +   * list of query columns from projection and filter
    +   * @param aggregateExpressions
    +   * list of aggregate expression (aggregate function)
    +   * @param carbonTable
    +   * parent carbon table
    +   * @param logicalRelation
    +   * parent logical relation
    +   * @return if plan is valid then aggregation data map schema and its relation plan
    +   */
    +  def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn],
    +      aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): (AggregationDataMapSchema, LogicalPlan) = {
    +    // getting all the projection columns
    +    val listProjectionColumn = queryColumns
    +      .filter(queryColumn => !queryColumn.isFilterColumn)
    +      .toList
    +    // getting all the filter columns
    +    val listFilterColumn = queryColumns
    +      .filter(queryColumn => queryColumn.isFilterColumn)
    +      .toList
    +    val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
    +    // create a query plan object which will be used to select the list of pre aggregate tables
    +    // matches with this plan
    +    val queryPlan = new QueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava)
    +    // create aggregate table selector object
    +    val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
    +    // select the list of valid child tables
    +    val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
    +    // query has only aggregate expression then selected data map will be empty
    +    // the validate all the child data map otherwise validate selected data map
    +    var selectedAggMaps = if (isProjectionColumnPresent) {
    +      selectedDataMapSchemas
    +    } else {
    +      carbonTable.getTableInfo.getDataMapSchemaList
    +    }
    +    val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
    +      PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp,
    +        carbonTable.getTableName,
    +        carbonTable.getDatabaseName,
    +        logicalRelation,
    +        sparkSession,
    +        parser)
    +    }.toSeq
    +    // if query does not have any aggregate function no need to validate the same
    +    if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
    +      selectedAggMaps = validateAggregateExpression(selectedAggMaps.asScala.toSeq,
    +        carbonTable,
    +        logicalRelation,
    +        aggExpLogicalPlans).asJava
    +    }
    +    // if it does not match with any pre aggregate table return the same plan
    +    if (!selectedAggMaps.isEmpty) {
    +      // filter the selected child schema based on size to select the pre-aggregate tables
    +      // that are nonEmpty
    +      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +      val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema =>
    +        val identifier = TableIdentifier(
    +          selectedDataMapSchema.getRelationIdentifier.getTableName,
    +          Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    +        val carbonRelation =
    +          catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    +        val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
    +        (selectedDataMapSchema, carbonRelation, relation)
    +      }.filter(_._2.sizeInBytes != 0L)
    +      if (relationBuffer.isEmpty) {
    +        // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
    +        // tables have date yet.
    +        // In this case we would return the original plan so that the query hits the parent
    +        // table.
    +        (null, null)
           } else {
    -        plan
    +        // If the relationBuffer is nonEmpty then find the table with the minimum size.
    +        val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
    +        val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
    +        (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema], newRelation)
           }
    +    } else {
    +      (null, null)
         }
       }
     
    +  /**
    +   * Below method will be used to validate aggregate expression with the data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val mappingModel = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => mappingModel.exists(m => p.sameResult(m.logicalPlan))
    --- End diff --
   
    move p => to up


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2674/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggreg...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159609322
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -1218,112 +1180,125 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        * parent column name
        * @param carbonTable
        * parent carbon table
    -   * @param tableName
    -   * parent table name
    -   * @param aggFunction
    -   * aggregate function applied
    -   * @param dataType
    -   * data type of the column
    -   * @param isChangedDataType
    -   * is cast is applied on column
        * @param isFilterColumn
        * is filter is applied on column
        * @return query column
        */
       def getQueryColumn(columnName: String,
           carbonTable: CarbonTable,
    -      tableName: String,
    -      aggFunction: String = "",
    -      dataType: String = "",
    -      isChangedDataType: Boolean = false,
           isFilterColumn: Boolean = false,
           timeseriesFunction: String = ""): QueryColumn = {
    -    val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
    +    val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase)
         if(null == columnSchema) {
           null
         } else {
    -      if (isChangedDataType) {
             new QueryColumn(columnSchema.getColumnSchema,
    -          columnSchema.getDataType.getName,
    -          aggFunction.toLowerCase,
               isFilterColumn,
               timeseriesFunction.toLowerCase)
    -      } else {
    -        new QueryColumn(columnSchema.getColumnSchema,
    -          CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
    -          aggFunction.toLowerCase,
    -          isFilterColumn,
    -          timeseriesFunction.toLowerCase)
    -      }
         }
       }
     }
     
    -object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    -
    +/**
    + * Data loading rule class to validate and update the data loading query plan
    + * Validation rule:
    + * 1. update the avg aggregate expression with two columns sum and count
    + * 2. Remove duplicate sum and count expression if already there in plan
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
    +  extends Rule[LogicalPlan] {
    +  lazy val parser = new CarbonSpark2SqlParser
       override def apply(plan: LogicalPlan): LogicalPlan = {
    -    val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
    +    val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
    +    val namedExpressionList = scala.collection.mutable.ListBuffer.empty[NamedExpression]
         plan transform {
    -      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
    +      case aggregate@Aggregate(_,
    +      aExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if validateAggregateExpressions(aExp) &&
    +        logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        val carbonTable = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
    +          .carbonTable
             aExp.foreach {
    -          case alias: Alias =>
    -            validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
    -          case _: UnresolvedAlias =>
    -          case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
    +          case attr: AttributeReference =>
    +              namedExpressionList += attr
    +            case alias@Alias(_: AttributeReference, _) =>
    +              namedExpressionList += alias
    +            case alias@Alias(aggExp: AggregateExpression, name) =>
    +              // get the updated expression for avg convert it to two expression
    +              // sum and count
    +              val expressions = PreAggregateUtil.getUpdateAggregateExpressions(aggExp)
    +              // if size is more than one then it was for average
    +              if(expressions.size > 1) {
    +                // get the logical plan for sum expression
    +                val logicalPlan_sum = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // get the logical plan fro count expression
    +                val logicalPlan_count = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.last,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already sum is present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_sum))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.head, name + " _ sum")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_sum)
    +                }
    +                // check with same expression already count is present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_count))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.last, name + " _ count")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_count)
    +                }
    +              } else {
    +                // get the logical plan for expression
    +                val logicalPlan = PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already  present then do not add to
    +                // named expression list otherwise update the list and add it to set
    +                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan))) {
    +                  namedExpressionList+=alias
    +                  validExpressionsMap += AggExpToColumnMappingModel(logicalPlan)
    +                }
    +              }
    +            case alias@Alias(_: Expression, _) =>
    +              namedExpressionList += alias
             }
    -        aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
    +        aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
           case plan: LogicalPlan => plan
         }
       }
     
    -    /**
    -     * This method will split the avg column into sum and count and will return a sequence of tuple
    -     * of unique name, alias
    -     *
    -     */
    -    private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String,
    -      NamedExpression)] = {
    -      alias match {
    -        case udf@Alias(_: ScalaUDF, name) =>
    -          Seq((name, udf))
    -        case alias@Alias(attrExpression: AggregateExpression, _) =>
    -          attrExpression.aggregateFunction match {
    -            case Sum(attr: AttributeReference) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Count(Seq(attr: AttributeReference)) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Average(attr: AttributeReference) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(attr),
    -                  resultId = NamedExpression.newExprId), attr.name + "_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(attr),
    -                    resultId = NamedExpression.newExprId), attr.name + "_count")()))
    -            case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(cast),
    -                  resultId = NamedExpression.newExprId),
    -                attr.name + "_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(cast), resultId =
    -                    NamedExpression.newExprId), attr.name + "_count")()))
    -            case _ => Seq(("", alias))
    -          }
    -
    -      }
    -    }
    -
       /**
        * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
        * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
        * valid.
    -   *
        * @param namedExpression
    -   * @return
    +   * named expressions
    --- End diff --
   
    move it up. comment should like
    `@param namedExpression named expressions`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2551/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1326/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2562/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2722/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2566/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2568/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2732/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2733/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2734/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2735/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2738/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1338/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2575/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2577/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1728: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Ex...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1728
 
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1343/



---
1234