GitHub user kumarvishal09 opened a pull request:
https://github.com/apache/carbondata/pull/1728 [CARBONDATA-1926] Expression support inside aggregate function for Query PR to support transforming of query plan for aggregate table when query aggregate function contains any expression - [ ] Any interfaces changed? NA - [ ] Any backward compatibility impacted? NA - [ ] Document update required? NA - [ ] Testing done Added UT to validate pre aggregate table selection and Data validation - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kumarvishal09/incubator-carbondata ExpressionSupportInQuery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1728.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1728 ---- commit 361790641779d881e21f54d7c0f78c19fcf3490e Author: kumarvishal <kumarvishal.1802@...> Date: 2017-12-20T10:16:02Z Added code to support expression inside aggregate function commit 3344f0fd3dd5cca31e26d2f1909fce600e8dd8e8 Author: kumarvishal <kumarvishal.1802@...> Date: 2017-12-25T09:34:39Z Added code to support expression inside aggregateExpression in query ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2364/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1152/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2578/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934173 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,105 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 5") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum") + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum") + } + + test("test pre agg table selection with expression 1") { + val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + + test("test pre agg table selection with expression 2") { + val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test pre agg table selection with expression 3") { + val df = sql("select sum(case when age=35 then id else 0 end) from maintable") + checkAnswer(df, Seq(Row(6.0))) + } + + test("test pre agg table selection with expression 4") { + val df = sql("select sum(case when age=27 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3") + checkAnswer(df, Seq(Row(2.0))) + } + + test("test pre agg table selection with expression 5") { + val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4") + checkAnswer(df, Seq(Row(2.0,6.0))) + } + + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ --- End diff -- add comment for this function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934260 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,105 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 5") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum") + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum") + } + + test("test pre agg table selection with expression 1") { + val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + + test("test pre agg table selection with expression 2") { + val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test pre agg table selection with expression 3") { + val df = sql("select sum(case when age=35 then id else 0 end) from maintable") + checkAnswer(df, Seq(Row(6.0))) + } + + test("test pre agg table selection with expression 4") { + val df = sql("select sum(case when age=27 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3") + checkAnswer(df, Seq(Row(2.0))) + } + + test("test pre agg table selection with expression 5") { + val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4") + checkAnswer(df, Seq(Row(2.0,6.0))) + } + --- End diff -- add a testcase for subquery also --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r158934363 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -126,16 +127,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, carbonTable, tableName, - list) - carbonTable + list, + aggregateExpressions) + (carbonTable, logicalRelation) // below case for handling filter query // When plan has grouping expression, aggregate expression // filter expression case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + aggregateExp, --- End diff -- unnecessary change --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2421/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1197/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1728 Retest this please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2612/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2426/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1728 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1202/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1728 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2625/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159051463 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -62,6 +62,12 @@ */ private int ordinal = Integer.MAX_VALUE; + /** --- End diff -- Remove unnecessary attributes like parentColumnToAggregationsMapping --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159052141 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to validate the logical plan + * and get all the details from to select proper aggregate table + * @param logicalPlan + * actual query logical plan + * @param list + * list of projection column present in plan + * @param qAggExprs + * list of aggregate expression + * @return if plan is valid for tranformation, parent table, parent logical relaion + */ + def validatePlanAndGetFields(logicalPlan: LogicalPlan, + list: scala.collection.mutable.HashSet[QueryColumn], + qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean, + CarbonTable, LogicalRelation) = { + var isValidPlan = false + var pTable: CarbonTable = null + var qLRelation: LogicalRelation = null + logicalPlan.transform { + // to handle filter expression + case filter@Filter(filterExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + // getting the columns from filter expression + if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) { + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable) + } + filter + // to handle aggregate expression + case agg@Aggregate(groupingExp, + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression( + groupingExp, + aggregateExp, + pTable, + list, + qAggExprs) + agg + // to handle aggregate expression with filter + case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) => + val out = validatePlanAndGetFields(filter, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if (isValidPlan) { + isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs) + } + agg + // to handle projection with order by + case proj@Project(projectList, sort@Sort(_, _, _)) => + val out = validatePlanAndGetFields(sort, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // to handle only projection + case proj@Project(projectList, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // case for handling aggregation with order by when only projection column exits + case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ + extractQueryColumnForOrderBy(None, sortOrders, pTable) + } + sort + } + (isValidPlan, pTable, qLRelation) + } + + /** + * Below method will be used to validate aggregate expression with the data map + * and will return the selected valid data maps + * @param selectedDataMap + * list of data maps + * @param carbonTable + * parent carbon table + * @param logicalRelation + * parent logical relation + * @param queryAggExpLogicalPlans + * query agg expression logical plan + * @return valid data map + */ + def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema], + carbonTable: CarbonTable, + logicalRelation: LogicalRelation, + queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = { + def validateDataMap(dataMap: DataMapSchema, + aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = { + val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap, + carbonTable, + logicalRelation) + aggExpLogicalPlans.forall{ + p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1)) + } + } + val selectedDataMapSchema = selectedDataMap.collect { + case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) => + dataMap + } + selectedDataMapSchema + } + + /** + * Below method will be used to update the logical plan of expression + * with parent table logical relation + * @param logicalPlan + * @param logicalRelation + * @return + */ + def updateLogicalRelation(logicalPlan: LogicalPlan, + logicalRelation: LogicalRelation): LogicalPlan = { + logicalPlan transform { + case l: LogicalRelation => + l.copy(relation = logicalRelation.relation) + } + } + + /** + * Below method will be used to to get the logical plan for each aggregate expression in + * child data map and its column schema mapping if mapping is already present + * then it will use the same otherwise it will generate and stored in aggregation data map + * @param selectedDataMap + * child data map + * @param carbonTable + * parent table + * @param logicalRelation + * logical relation of actual plan + * @return map of logical plan for each aggregate expression in child query and its column mapping + */ + def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable, + logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = { + val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema] + // if column mapping is not present + if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) { + // add preAGG UDF to avoid all the PreAggregate rule + val childDataMapQueryString = new CarbonSpark2SqlParser() + .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY")) + // get the logical plan + val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan + // getting all aggregate expression from query + val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan) + // in case of average child table will have two columns which will be stored in sequence + // so for average expression we need to get two columns for mapping + var counter = 0 + // sorting the columns based on schema ordinal so search will give proper result + val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala + .sortBy(_.getSchemaOrdinal) + val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp => + // for each aggregate expression get logical plan + val expLogicalPlan = getLogicalPlanFromAggExp(aggExp, + carbonTable.getTableName, + carbonTable.getDatabaseName, logicalRelation) + // check if aggregate expression is of type avg + // get the columns + var columnSchema = aggDataMapSchema + .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava) + // increment the counter so when for next expression above code will be + // executed it will search from that schema ordinal + counter = columnSchema.getSchemaOrdinal + 1 + (expLogicalPlan, columnSchema) + }.toMap + // store the mapping in data map + aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava) + // return the mapping + logicalPlanToColumnMapping + } else { + // if already present in data map then return the same + aggDataMapSchema.getAggregateExpressionToColumnMapping + .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap + } + } + + + /** + * Below method will be used to get the logical plan from aggregate expression + * @param aggExp + * aggregate expression + * @param tableName + * parent table name + * @param databaseName + * database name + * @param logicalRelation + * logical relation + * @return logical plan + */ + def getLogicalPlanFromAggExp(aggExp: AggregateExpression, + tableName: String, + databaseName: String, + logicalRelation: LogicalRelation): LogicalPlan = { + // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not + // be applied + val query = new CarbonSpark2SqlParser() + .addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName") + // updating the logical relation of logical plan to so when two logical plan + // will be compared it will not consider relation + updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation) + } + + /** + * Below method will be used to get aggregate expression + * @param logicalPlan + * logical plan + * @return list of aggregate expression + */ + def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = { + val list = scala.collection.mutable.HashSet.empty[AggregateExpression] --- End diff -- Should use List or LinkedHasSet as the order of insertion is required for mapping --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159053493 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to validate the logical plan + * and get all the details from to select proper aggregate table + * @param logicalPlan + * actual query logical plan + * @param list + * list of projection column present in plan + * @param qAggExprs + * list of aggregate expression + * @return if plan is valid for tranformation, parent table, parent logical relaion + */ + def validatePlanAndGetFields(logicalPlan: LogicalPlan, + list: scala.collection.mutable.HashSet[QueryColumn], + qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean, + CarbonTable, LogicalRelation) = { + var isValidPlan = false + var pTable: CarbonTable = null + var qLRelation: LogicalRelation = null + logicalPlan.transform { + // to handle filter expression + case filter@Filter(filterExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + // getting the columns from filter expression + if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) { + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable) + } + filter + // to handle aggregate expression + case agg@Aggregate(groupingExp, + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression( + groupingExp, + aggregateExp, + pTable, + list, + qAggExprs) + agg + // to handle aggregate expression with filter + case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) => + val out = validatePlanAndGetFields(filter, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if (isValidPlan) { + isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs) + } + agg + // to handle projection with order by + case proj@Project(projectList, sort@Sort(_, _, _)) => + val out = validatePlanAndGetFields(sort, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // to handle only projection + case proj@Project(projectList, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // case for handling aggregation with order by when only projection column exits + case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ + extractQueryColumnForOrderBy(None, sortOrders, pTable) + } + sort + } + (isValidPlan, pTable, qLRelation) + } + + /** + * Below method will be used to validate aggregate expression with the data map + * and will return the selected valid data maps + * @param selectedDataMap + * list of data maps + * @param carbonTable + * parent carbon table + * @param logicalRelation + * parent logical relation + * @param queryAggExpLogicalPlans + * query agg expression logical plan + * @return valid data map + */ + def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema], + carbonTable: CarbonTable, + logicalRelation: LogicalRelation, + queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = { + def validateDataMap(dataMap: DataMapSchema, + aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = { + val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap, + carbonTable, + logicalRelation) + aggExpLogicalPlans.forall{ + p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1)) + } + } + val selectedDataMapSchema = selectedDataMap.collect { + case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) => + dataMap + } + selectedDataMapSchema + } + + /** + * Below method will be used to update the logical plan of expression + * with parent table logical relation + * @param logicalPlan + * @param logicalRelation + * @return + */ + def updateLogicalRelation(logicalPlan: LogicalPlan, + logicalRelation: LogicalRelation): LogicalPlan = { + logicalPlan transform { + case l: LogicalRelation => + l.copy(relation = logicalRelation.relation) + } + } + + /** + * Below method will be used to to get the logical plan for each aggregate expression in + * child data map and its column schema mapping if mapping is already present + * then it will use the same otherwise it will generate and stored in aggregation data map + * @param selectedDataMap + * child data map + * @param carbonTable + * parent table + * @param logicalRelation + * logical relation of actual plan + * @return map of logical plan for each aggregate expression in child query and its column mapping + */ + def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable, + logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = { + val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema] + // if column mapping is not present + if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) { + // add preAGG UDF to avoid all the PreAggregate rule + val childDataMapQueryString = new CarbonSpark2SqlParser() + .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY")) + // get the logical plan + val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan + // getting all aggregate expression from query + val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan) + // in case of average child table will have two columns which will be stored in sequence + // so for average expression we need to get two columns for mapping + var counter = 0 + // sorting the columns based on schema ordinal so search will give proper result + val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala + .sortBy(_.getSchemaOrdinal) + val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp => + // for each aggregate expression get logical plan + val expLogicalPlan = getLogicalPlanFromAggExp(aggExp, + carbonTable.getTableName, + carbonTable.getDatabaseName, logicalRelation) + // check if aggregate expression is of type avg + // get the columns + var columnSchema = aggDataMapSchema + .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava) + // increment the counter so when for next expression above code will be + // executed it will search from that schema ordinal + counter = columnSchema.getSchemaOrdinal + 1 + (expLogicalPlan, columnSchema) + }.toMap + // store the mapping in data map + aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava) + // return the mapping + logicalPlanToColumnMapping + } else { + // if already present in data map then return the same + aggDataMapSchema.getAggregateExpressionToColumnMapping + .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap + } + } + + + /** + * Below method will be used to get the logical plan from aggregate expression + * @param aggExp + * aggregate expression + * @param tableName + * parent table name + * @param databaseName + * database name + * @param logicalRelation + * logical relation + * @return logical plan + */ + def getLogicalPlanFromAggExp(aggExp: AggregateExpression, + tableName: String, + databaseName: String, + logicalRelation: LogicalRelation): LogicalPlan = { + // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not + // be applied + val query = new CarbonSpark2SqlParser() --- End diff -- Don't create parser every time, pass from caller --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159056348 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to validate the logical plan + * and get all the details from to select proper aggregate table + * @param logicalPlan + * actual query logical plan + * @param list + * list of projection column present in plan + * @param qAggExprs + * list of aggregate expression + * @return if plan is valid for tranformation, parent table, parent logical relaion + */ + def validatePlanAndGetFields(logicalPlan: LogicalPlan, + list: scala.collection.mutable.HashSet[QueryColumn], + qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean, + CarbonTable, LogicalRelation) = { + var isValidPlan = false + var pTable: CarbonTable = null + var qLRelation: LogicalRelation = null + logicalPlan.transform { --- End diff -- It seems Join condition of two plans is not handled here. Please handle Join, Union cases also using recursion. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159057341 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -282,6 +136,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val listFilterColumn = list .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) .toList + val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0 // getting all the aggregation columns val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) --- End diff -- This code is unused now , please remove all code related to it. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159058055 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -1124,92 +1328,57 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * * @param carbonTable * parent table - * @param aggFunctions - * aggregation function - * @param tableName - * parent table name + * @param aggExp + * aggregate expression * @return list of fields */ def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable, - aggFunctions: AggregateFunction, - tableName: String - ): Seq[QueryColumn] = { + aggExp: AggregateExpression): Seq[AggregateExpression] = { val changedDataType = true --- End diff -- Remove unused variable --- |
Free forum by Nabble | Edit this page |