Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159458812 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { + if (conditionExp.isDefined) { + val filterExp = conditionExp.get + Some(filterExp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + childExp.get._2 + } else { + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + }) + } else { + conditionExp + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan + * parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate(grExp, + aggExp, + child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(logicalRelation) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, - carbonTable = carbonTable, - tableName = tableName) + logicalRelation) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + logicalRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } else { + agg } - carbonTable - // case for handling aggregation with order by and filter when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + } else { + agg + } + // case of handling aggregation query with filter + case agg@Aggregate(grExp, --- End diff -- How about with Sort ? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159459370 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { + if (conditionExp.isDefined) { + val filterExp = conditionExp.get + Some(filterExp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + childExp.get._2 + } else { + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + }) + } else { + conditionExp + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan + * parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate(grExp, + aggExp, + child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(logicalRelation) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, - carbonTable = carbonTable, - tableName = tableName) + logicalRelation) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + logicalRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } else { + agg } - carbonTable - // case for handling aggregation with order by and filter when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + } else { + agg + } + // case of handling aggregation query with filter + case agg@Aggregate(grExp, + aggExp, + Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(logicalRelation) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + var isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + // getting the columns from filter expression + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + if (isValidPlan) { + isValidPlan = extractQueryColumnFromFilterExp(expression, list, carbonTable) + } + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - if(isValidPlan) { - list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, - carbonTable = carbonTable, - tableName = tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) - } - carbonTable - case _ => - isValidPlan = false - null - } - if (isValidPlan && null != carbonTable) { - isValidPlan = isSpecificSegmentPresent(carbonTable) - } - // if plan is valid then update the plan with child attributes - if (isValidPlan) { - // getting all the projection columns - val listProjectionColumn = list - .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) - .toList - // getting all the filter columns - val listFilterColumn = list - .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) - .toList - // getting all the aggregation columns - val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) - .toList - // create a query plan object which will be used to select the list of pre aggregate tables - // matches with this plan - val queryPlan = new QueryPlan(listProjectionColumn.asJava, - listAggregationColumn.asJava, - listFilterColumn.asJava) - // create aggregate table selector object - val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) - // select the list of valid child tables - val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() - // if it does not match with any pre aggregate table return the same plan - if (!selectedDataMapSchemas.isEmpty) { - // filter the selected child schema based on size to select the pre-aggregate tables - // that are nonEmpty - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema => - val identifier = TableIdentifier( - selectedDataMapSchema.getRelationIdentifier.getTableName, - Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName)) - val carbonRelation = - catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation] - val relation = sparkSession.sessionState.catalog.lookupRelation(identifier) - (selectedDataMapSchema, carbonRelation, relation) - }.filter(_._2.sizeInBytes != 0L) - if (relationBuffer.isEmpty) { - // If the size of relation Buffer is 0 then it means that none of the pre-aggregate - // tables have date yet. - // In this case we would return the original plan so that the query hits the parent - // table. - plan + logicalRelation) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(grExp, + aggExp, + child, + Some(expression), + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + logicalRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + Filter(updatedFilterExpression.get, + newChild)) } else { - // If the relationBuffer is nonEmpty then find the table with the minimum size. - val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes) - val newRelation = new FindDataSourceTable(sparkSession).apply(relation) - // transform the query plan based on selected child schema - transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation) + agg } } else { - plan + agg } + } + updatedPlan + } + + /** + * Below method will be used to validate query plan and get the proper aggregation data map schema + * and child relation plan object if plan is valid for transformation + * @param queryColumns + * list of query columns from projection and filter + * @param aggregateExpressions + * list of aggregate expression (aggregate function) + * @param carbonTable + * parent carbon table + * @param logicalRelation + * parent logical relation + * @return if plan is valid then aggregation data map schema and its relation plan + */ + def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn], + aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression], + carbonTable: CarbonTable, + logicalRelation: LogicalRelation): (AggregationDataMapSchema, LogicalPlan) = { + // getting all the projection columns + val listProjectionColumn = queryColumns + .filter(queryColumn => !queryColumn.isFilterColumn) + .toList + // getting all the filter columns + val listFilterColumn = queryColumns + .filter(queryColumn => queryColumn.isFilterColumn) + .toList + val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0 + // create a query plan object which will be used to select the list of pre aggregate tables + // matches with this plan + val queryPlan = new QueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava) + // create aggregate table selector object + val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) + // select the list of valid child tables + val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // query has only aggregate expression then selected data map will be empty + // the validate all the child data map otherwise validate selected data map + var selectedAggMaps = if (isProjectionColumnPresent) { + selectedDataMapSchemas + } else { + carbonTable.getTableInfo.getDataMapSchemaList + } + val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp => + PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp, + carbonTable.getTableName, + carbonTable.getDatabaseName, + logicalRelation, + sparkSession, + parser) + }.toSeq + // if query does not have any aggregate function no need to validate the same + if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) { + selectedAggMaps = validateAggregateExpression(selectedAggMaps.asScala.toSeq, + carbonTable, + logicalRelation, + aggExpLogicalPlans).asJava + } + // if it does not match with any pre aggregate table return the same plan + if (!selectedAggMaps.isEmpty) { + // filter the selected child schema based on size to select the pre-aggregate tables + // that are nonEmpty + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema => + val identifier = TableIdentifier( + selectedDataMapSchema.getRelationIdentifier.getTableName, + Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName)) + val carbonRelation = + catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation] + val relation = sparkSession.sessionState.catalog.lookupRelation(identifier) + (selectedDataMapSchema, carbonRelation, relation) + }.filter(_._2.sizeInBytes != 0L) + if (relationBuffer.isEmpty) { + // If the size of relation Buffer is 0 then it means that none of the pre-aggregate + // tables have date yet. + // In this case we would return the original plan so that the query hits the parent + // table. + (null, null) } else { - plan + // If the relationBuffer is nonEmpty then find the table with the minimum size. + val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes) + val newRelation = new FindDataSourceTable(sparkSession).apply(relation) + (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema], newRelation) } + } else { + (null, null) } } + /** + * Below method will be used to validate aggregate expression with the data map + * and will return the selected valid data maps + * @param selectedDataMap + * list of data maps + * @param carbonTable + * parent carbon table + * @param logicalRelation + * parent logical relation + * @param queryAggExpLogicalPlans + * query agg expression logical plan + * @return valid data map + */ + def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema], + carbonTable: CarbonTable, + logicalRelation: LogicalRelation, + queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = { + def validateDataMap(dataMap: DataMapSchema, + aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = { + val mappingModel = getLogicalPlanForAggregateExpression(dataMap, + carbonTable, + logicalRelation) + aggExpLogicalPlans.forall{ + p => mappingModel.exists(m => p.sameResult(m.logicalPlan)) --- End diff -- move p => to up --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2674/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159609322 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -1218,112 +1180,125 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * parent column name * @param carbonTable * parent carbon table - * @param tableName - * parent table name - * @param aggFunction - * aggregate function applied - * @param dataType - * data type of the column - * @param isChangedDataType - * is cast is applied on column * @param isFilterColumn * is filter is applied on column * @return query column */ def getQueryColumn(columnName: String, carbonTable: CarbonTable, - tableName: String, - aggFunction: String = "", - dataType: String = "", - isChangedDataType: Boolean = false, isFilterColumn: Boolean = false, timeseriesFunction: String = ""): QueryColumn = { - val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase) + val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase) if(null == columnSchema) { null } else { - if (isChangedDataType) { new QueryColumn(columnSchema.getColumnSchema, - columnSchema.getDataType.getName, - aggFunction.toLowerCase, isFilterColumn, timeseriesFunction.toLowerCase) - } else { - new QueryColumn(columnSchema.getColumnSchema, - CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), - aggFunction.toLowerCase, - isFilterColumn, - timeseriesFunction.toLowerCase) - } } } } -object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { - +/** + * Data loading rule class to validate and update the data loading query plan + * Validation rule: + * 1. update the avg aggregate expression with two columns sum and count + * 2. Remove duplicate sum and count expression if already there in plan + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) + extends Rule[LogicalPlan] { + lazy val parser = new CarbonSpark2SqlParser override def apply(plan: LogicalPlan): LogicalPlan = { - val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] + val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel] + val namedExpressionList = scala.collection.mutable.ListBuffer.empty[NamedExpression] plan transform { - case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => + case aggregate@Aggregate(_, + aExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + if validateAggregateExpressions(aExp) && + logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + val carbonTable = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + .carbonTable aExp.foreach { - case alias: Alias => - validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) - case _: UnresolvedAlias => - case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) + case attr: AttributeReference => + namedExpressionList += attr + case alias@Alias(_: AttributeReference, _) => + namedExpressionList += alias + case alias@Alias(aggExp: AggregateExpression, name) => + // get the updated expression for avg convert it to two expression + // sum and count + val expressions = PreAggregateUtil.getUpdateAggregateExpressions(aggExp) + // if size is more than one then it was for average + if(expressions.size > 1) { + // get the logical plan for sum expression + val logicalPlan_sum = PreAggregateUtil.getLogicalPlanFromAggExp( + expressions.head, + carbonTable.getTableName, + carbonTable.getDatabaseName, + logicalRelation, + sparkSession, + parser) + // get the logical plan fro count expression + val logicalPlan_count = PreAggregateUtil.getLogicalPlanFromAggExp( + expressions.last, + carbonTable.getTableName, + carbonTable.getDatabaseName, + logicalRelation, + sparkSession, + parser) + // check with same expression already sum is present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_sum))) { + namedExpressionList += + Alias(expressions.head, name + " _ sum")(NamedExpression.newExprId, + alias.qualifier, + Some(alias.metadata), + alias.isGenerated) + validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_sum) + } + // check with same expression already count is present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_count))) { + namedExpressionList += + Alias(expressions.last, name + " _ count")(NamedExpression.newExprId, + alias.qualifier, + Some(alias.metadata), + alias.isGenerated) + validExpressionsMap += AggExpToColumnMappingModel(logicalPlan_count) + } + } else { + // get the logical plan for expression + val logicalPlan = PreAggregateUtil.getLogicalPlanFromAggExp( + expressions.head, + carbonTable.getTableName, + carbonTable.getDatabaseName, + logicalRelation, + sparkSession, + parser) + // check with same expression already present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan))) { + namedExpressionList+=alias + validExpressionsMap += AggExpToColumnMappingModel(logicalPlan) + } + } + case alias@Alias(_: Expression, _) => + namedExpressionList += alias } - aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq) + aggregate.copy(aggregateExpressions = namedExpressionList.toSeq) case plan: LogicalPlan => plan } } - /** - * This method will split the avg column into sum and count and will return a sequence of tuple - * of unique name, alias - * - */ - private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String, - NamedExpression)] = { - alias match { - case udf@Alias(_: ScalaUDF, name) => - Seq((name, udf)) - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Sum(attr: AttributeReference) => - (attr.name + "_sum", alias) :: Nil - case Sum(MatchCastExpression(attr: AttributeReference, _)) => - (attr.name + "_sum", alias) :: Nil - case Count(Seq(attr: AttributeReference)) => - (attr.name + "_count", alias) :: Nil - case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) => - (attr.name + "_count", alias) :: Nil - case Average(attr: AttributeReference) => - Seq((attr.name + "_sum", Alias(attrExpression. - copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")()), - (attr.name, Alias(attrExpression. - copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")())) - case Average(cast@MatchCastExpression(attr: AttributeReference, _)) => - Seq((attr.name + "_sum", Alias(attrExpression. - copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")()), - (attr.name, Alias(attrExpression. - copy(aggregateFunction = Count(cast), resultId = - NamedExpression.newExprId), attr.name + "_count")())) - case _ => Seq(("", alias)) - } - - } - } - /** * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not. * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is * valid. - * * @param namedExpression - * @return + * named expressions --- End diff -- move it up. comment should like `@param namedExpression named expressions` --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2551/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1326/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2562/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2722/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2566/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2568/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2732/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2733/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2734/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2735/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2738/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1728 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1338/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2575/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2577/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1343/ --- |
Free forum by Nabble | Edit this page |