Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058395 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -933,121 +1143,122 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * child data map schema * @param attributes * child logical relation + * @param expLogicalPlanToColumnSchemaMapping + * expression logical plan to data map column mapping + * @param parentTable + * parent carbon table + * @param logicalRelation + * logical relation * @return updated expression */ def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression, - dataMapSchema: DataMapSchema, - attributes: Seq[AttributeReference]): + dataMapSchema: AggregationDataMapSchema, + attributes: Seq[AttributeReference], + expLogicalPlanToColumnSchemaMapping: Option[Map[LogicalPlan, ColumnSchema]], + parentTable: CarbonTable, + logicalRelation: LogicalRelation): Expression = { + val updatedAggExp = getUpdateAggregateExpressions(aggExp) --- End diff -- Add code comment here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058689 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -294,12 +149,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) // select the list of valid child tables val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // query has only aggregate expression then selected data map will be empty + // the validate all the child data map otherwise validate selected data map + var selectedAggMaps = if (isProjectionColumnPresent) { + selectedDataMapSchemas + } else { + carbonTable.getTableInfo.getDataMapSchemaList + } + val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp => + getLogicalPlanFromAggExp(queryAggExp, + carbonTable.getTableName, + carbonTable.getDatabaseName, logicalRelation) + }.toSeq + if(aggregateExpressions.size > 0 && selectedAggMaps.size > 0) { --- End diff -- add comments --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1219/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2451/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159378009 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,105 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 5") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum") + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum") + } + + test("test pre agg table selection with expression 1") { + val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + + test("test pre agg table selection with expression 2") { + val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test pre agg table selection with expression 3") { + val df = sql("select sum(case when age=35 then id else 0 end) from maintable") + checkAnswer(df, Seq(Row(6.0))) + } + + test("test pre agg table selection with expression 4") { + val df = sql("select sum(case when age=27 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3") + checkAnswer(df, Seq(Row(2.0))) + } + + test("test pre agg table selection with expression 5") { + val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4") + checkAnswer(df, Seq(Row(2.0,6.0))) + } + --- End diff -- add TestPreAggregateWithSubQuery --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2508/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1283/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159435962 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -18,28 +18,46 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable -import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} -import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.util.CarbonScalaUtil +/** + * model class to store aggregate expression logical plan + * and its column schema mapping + * @param logicalPlan + * logical plan of aggregate expression + * @param columnSchema + * column schema from table + */ +case class AggExpToColumnMappingModel(var logicalPlan: LogicalPlan, --- End diff -- Move `var logicalPlan: LogicalPlan` to next line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438073 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { --- End diff -- Better do the transform instead of match as in case if Alias comes match cannot work --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438250 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { --- End diff -- Better use `match { case }` instead of if else --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159438754 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) --- End diff -- why you use `attr.qualifier ` intead of `childExp.qualifier` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159440196 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { --- End diff -- Better use match { case } instead of if else --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159440457 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression --- End diff -- not required to assign to variable --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159442284 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => --- End diff -- use transform inside map , then you no need to handle Alias separetly. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159442746 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { --- End diff -- You can directly use `updateExpression` no need of this method. `NamedExpression` is derived from `Expression`. So after updation you can type cast. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443128 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { --- End diff -- Here also you use updateExpression method, no need of this method --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443443 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { + if (conditionExp.isDefined) { + val filterExp = conditionExp.get + Some(filterExp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + childExp.get._2 + } else { + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + }) + } else { + conditionExp + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan + * parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate(grExp, --- End diff -- MOve down `grExp` to next line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159443569 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { + if (conditionExp.isDefined) { + val filterExp = conditionExp.get + Some(filterExp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + childExp.get._2 + } else { + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + }) + } else { + conditionExp + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan + * parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate(grExp, + aggExp, + child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) --- End diff -- Indentation is wrong --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159457280 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) --- End diff -- childExp.qualifier will not have table alias name in case of join we need to table alias name --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159457864 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -806,67 +824,105 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val updatedAggExp = aggregateExpressions.map { --- End diff -- It seems lot of duplicate code, better do transform and use single `case attr: AttributeReference` handle all here --- |
Free forum by Nabble | Edit this page |