Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149002531 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -16,17 +16,18 @@ */ package org.apache.spark.sql.execution.command.preaaggregate -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} --- End diff -- what is the use of method `prepareSchemaJson` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149003521 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { --- End diff -- Already added this check in case statement , no need to add this check again --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149003605 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + + // below case for handling filter query + // When plan has grouping expression, aggregate expression + // filter expression + case Aggregate(groupingExp, aggregateExp, + Filter(filterExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _))) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { --- End diff -- Already added this check in case statement , no need to add this check again --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149004253 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + + // below case for handling filter query + // When plan has grouping expression, aggregate expression + // filter expression + case Aggregate(groupingExp, aggregateExp, + Filter(filterExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _))) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + // getting the columns from filter expression + filterExp.transform { + case attr: AttributeReference => + list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) + attr + } + } + carbonTable + + // When plan has grouping expression, aggregate expression + // logical relation + case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + case _ => + isValidPlan = false + null + } + // if plan is valid then update the plan with child attributes + if (isValidPlan) { + // getting all the projection columns + val listProjectionColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) + // getting all the filter columns + val listFilterColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) + // getting all the aggregation columns + val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) + // create a query plan object which will be used to select the list of pre aggregate tables + // matches with this plan + val queryPlan = new QueryPlan(listProjectionColumn.asJava, + listAggregationColumn.asJava, + listFilterColumn.asJava) + // create aggregate table selector object + val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) + // select the list of valid child tables + val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // if it doesnot match with any pre aggregate table return the same plan + if (!selectedDataMapSchemas.isEmpty) { + // sort the selected child schema based on size to select smallest pre aggregate table + val (aggDataMapSchema, carbonRelation) = + selectedDataMapSchemas.asScala.map { selectedDataMapSchema => + val catalog = sparkSession.sessionState.catalog + val carbonRelation = catalog + .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier + .getTableName, + Some(selectedDataMapSchema.getRelationIdentifier + .getDatabaseName))).asInstanceOf[SubqueryAlias].child + .asInstanceOf[LogicalRelation] + (selectedDataMapSchema, carbonRelation) + }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes) + .head --- End diff -- Instead of using `}.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes).head` use `.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149004422 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + + // below case for handling filter query + // When plan has grouping expression, aggregate expression + // filter expression + case Aggregate(groupingExp, aggregateExp, + Filter(filterExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _))) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && --- End diff -- put && on the above line. Please do this for all case statements --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149005119 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + + // below case for handling filter query + // When plan has grouping expression, aggregate expression + // filter expression + case Aggregate(groupingExp, aggregateExp, + Filter(filterExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _))) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + // getting the columns from filter expression + filterExp.transform { + case attr: AttributeReference => + list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) + attr + } + } + carbonTable + + // When plan has grouping expression, aggregate expression + // logical relation + case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + case _ => + isValidPlan = false + null + } + // if plan is valid then update the plan with child attributes + if (isValidPlan) { + // getting all the projection columns + val listProjectionColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) + // getting all the filter columns + val listFilterColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) + // getting all the aggregation columns + val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) + // create a query plan object which will be used to select the list of pre aggregate tables + // matches with this plan + val queryPlan = new QueryPlan(listProjectionColumn.asJava, + listAggregationColumn.asJava, + listFilterColumn.asJava) + // create aggregate table selector object + val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) + // select the list of valid child tables + val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // if it doesnot match with any pre aggregate table return the same plan + if (!selectedDataMapSchemas.isEmpty) { + // sort the selected child schema based on size to select smallest pre aggregate table + val (aggDataMapSchema, carbonRelation) = + selectedDataMapSchemas.asScala.map { selectedDataMapSchema => + val catalog = sparkSession.sessionState.catalog + val carbonRelation = catalog + .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier + .getTableName, + Some(selectedDataMapSchema.getRelationIdentifier + .getDatabaseName))).asInstanceOf[SubqueryAlias].child + .asInstanceOf[LogicalRelation] + (selectedDataMapSchema, carbonRelation) + }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes) + .head + // transform the query plan based on selected child schema + transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation) + } else { + plan + } + } else { + plan + } + } + } + + /** + * Below method will be used to get the child attribute reference + * based on parent name + * + * @param dataMapSchema + * child schema + * @param attributeReference + * parent attribute reference + * @param childCarbonRelation + * child logical relation + * @param aggFunction + * aggregation function applied on child + * @return child attribute reference + */ + def getChildAttributeReference(dataMapSchema: DataMapSchema, + attributeReference: AttributeReference, + childCarbonRelation: LogicalRelation, + aggFunction: String = ""): AttributeReference = { + val columnSchema = if (aggFunction.isEmpty) { + dataMapSchema.getChildColumnByParentName(attributeReference.name) + } else { + dataMapSchema.getChildColByParentWithAggFun(attributeReference.name, aggFunction) + } + // here column schema cannot be null, if it is null then aggregate table selection + // logic has some problem + if (null == columnSchema) { + throw new AnalysisException("Column doesnot exists in Pre Aggregate table") + } + // finding the child attribute from child logical relation + childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2 + } + + /** + * Below method will be used to transform the main table plan to child table plan + * rules for transformming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param logicalPlan + * parent logical plan + * @param aggDataMapSchema + * select data map schema + * @param childCarbonRelation + * child carbon table relation + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan, + aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = { + logicalPlan.transform { + case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _)) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + childCarbonRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + case Aggregate(grExp, + aggExp, + Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(grExp, + aggExp, + child, + Some(expression), + aggDataMapSchema, + childCarbonRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + Filter(updatedFilterExpression.get, + newChild)) + case Aggregate(grExp, aggExp, l: LogicalRelation) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasPreAggDataMap => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + l, + None, + aggDataMapSchema, + childCarbonRelation) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } + } + + /** + * Below method will be used to get the updated expression for pre aggregated table. + * It will replace the attribute of actual plan with child table attributes. + * Updation will be done for below expression. + * 1. Grouping expression + * 2. aggregate expression + * 3. child logical plan + * 4. filter expression if present + * + * @param groupingExpressions + * actual plan grouping expression + * @param aggregateExpressions + * actual plan aggregate expression + * @param child + * child logical plan + * @param filterExpression + * filter expression + * @param aggDataMapSchema + * pre aggregate table schema + * @param childCarbonRelation + * pre aggregate table logical relation + * @return tuple of(updated grouping expression, + * updated aggregate expression, + * updated child logical plan, + * updated filter expression if present in actual plan) + */ + def getUpdatedExpressions(groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[NamedExpression], + child: LogicalPlan, filterExpression: Option[Expression] = None, + aggDataMapSchema: DataMapSchema, + childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan, + Option[Expression]) = { + // transforming the group by expression attributes with child attributes + val updatedGroupExp = groupingExpressions.map { exp => + exp.transform { + case attr: AttributeReference => + getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation) + } + } + // below code is for updating the aggregate expression. + // Note: In case of aggregate expression updation we need to return alias as + // while showing the final result we need to show based on actual query + // for example: If query is "select name from table group by name" + // if we only update the attributes it will show child table column name in final output + // so for handling this if attributes does not have alias we need to return alias of + // parent + // table column name + // Rules for updating aggregate expression. + // 1. If it matches with attribute reference return alias of child attribute reference + // 2. If it matches with alias return same alias with child attribute reference + // 3. If it matches with alias of any supported aggregate function return aggregate function + // with child attribute reference. Please check class level documentation how when aggregate + // function will be updated + + val updatedAggExp = aggregateExpressions.map { + exp => exp match { --- End diff -- no need of this match , directly use case inside map --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149012346 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -21,7 +21,14 @@ import java.io.IOException; import java.io.Serializable; import java.util.HashMap; +import java.util.HashSet; --- End diff -- There is already a class name `DataMapSchema.java` so better change the older class name to `CarbonRowSchemq` --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/859/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1485/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/942/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1557/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r150147994 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -16,17 +16,18 @@ */ package org.apache.spark.sql.execution.command.preaaggregate -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} --- End diff -- removed --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r150148019 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -49,70 +50,109 @@ object PreAggregateUtil { def getParentCarbonTable(plan: LogicalPlan): CarbonTable = { plan match { - case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable + case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable + case Aggregate(_, _, logicalRelation: LogicalRelation) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable case _ => throw new MalformedCarbonCommandException("table does not exist") --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/945/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1560/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/947/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1562/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/991/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1608/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1002/ --- |
Free forum by Nabble | Edit this page |