GitHub user kumarvishal09 opened a pull request:
https://github.com/apache/carbondata/pull/1464 [WIP][CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes Add the API in carbon layer to get suitable aggregation table for group by query. Update query plan in carbon optimizer to support aggregation tables for group by queries. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed: None - [ ] Any backward compatibility impacted: None - [ ] Document update required- Yes - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kumarvishal09/incubator-carbondata pre-aggregate_Query Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1464.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1464 ---- commit 4cc40cf6cd68f4e4c1e6cb613b3e28c5bf6024e0 Author: kumarvishal <[hidden email]> Date: 2017-10-30T07:14:32Z aggregation selector ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/800/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/801/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1433/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1464 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/802/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1434/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1464 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1435/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148991769 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -124,6 +124,8 @@ */ private int numberOfNoDictSortColumns; + private boolean hasPreAggDataMap; --- End diff -- Better name it as 'hasChildDataMap' --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148992553 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -67,6 +92,83 @@ public void setRelationIdentifier(RelationIdentifier relationIdentifier) { public void setChildSchema(TableSchema childSchema) { this.childSchema = childSchema; + List<ColumnSchema> listOfColumns = this.childSchema.getListOfColumns(); + fillNonAggFunctionColumns(listOfColumns); + fillAggFunctionColumns(listOfColumns); + fillParentNameToAggregationMapping(listOfColumns); + } + + /** + * Method to prepare mapping of parent to list of aggregation function applied on that column + * @param listOfColumns + * child column schema list + */ + private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) { + parentColumnToAggregationsMapping = new HashMap<>(); + for (ColumnSchema column : listOfColumns) { + if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) { + List<ParentColumnTableRelation> parentColumnTableRelations = + column.getParentColumnTableRelations(); + if (null != parentColumnTableRelations) { --- End diff -- Please check the size of this list as well or iterate the list instead of always getting from 0 element. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148992649 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -67,6 +92,83 @@ public void setRelationIdentifier(RelationIdentifier relationIdentifier) { public void setChildSchema(TableSchema childSchema) { this.childSchema = childSchema; + List<ColumnSchema> listOfColumns = this.childSchema.getListOfColumns(); + fillNonAggFunctionColumns(listOfColumns); + fillAggFunctionColumns(listOfColumns); + fillParentNameToAggregationMapping(listOfColumns); --- End diff -- I feel all the above 3 functions doing the almost same job, why don't you combine all of them. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148995479 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -41,6 +48,24 @@ */ private Map<String, String> properties; + /** + * map of parent column name to set of child column column without + * aggregation function + */ + private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping; + + /** + * map of parent column name to set of child columns column with + * aggregation function + */ + private Map<String, Set<ColumnSchema>> parentToAggChildMapping; + + /** + * map of parent column name to set of aggregation function applied in + * in parent column + */ + private Map<String, Set<String>> parentColumnToAggregationsMapping; + public DataMapSchema(String className) { --- End diff -- Create a factory and extend this datamap schema and implement as per the class name. All aggdatamap related should go to AggregationDataMapSchema class. DataMapSchema should be the generic class and it should only contains the generic attributes. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148995547 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) { String value = in.readUTF(); this.properties.put(key, value); } + } + /** + * Below method will be used to get the columns on which aggregate function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) { --- End diff -- Better name as `getNonAggChildColumnByParentColName` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148995569 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) { String value = in.readUTF(); this.properties.put(key, value); } + } + /** + * Below method will be used to get the columns on which aggregate function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) { + Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName); + if (null != columnSchemas) { + Iterator<ColumnSchema> iterator = columnSchemas.iterator(); + while (iterator.hasNext()) { + ColumnSchema next = iterator.next(); + if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the column schema based on parent column name + * @param columName + * parent column name + * @return child column schmea + */ + public ColumnSchema getChildColumnByParentName(String columName) { --- End diff -- better name as `getChildColumnByParentColName' --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148995599 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java --- @@ -116,6 +218,83 @@ public void setProperties(Map<String, String> properties) { String value = in.readUTF(); this.properties.put(key, value); } + } + /** + * Below method will be used to get the columns on which aggregate function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getChildColBasedByParentForNonAggF(String columnName) { + Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName); + if (null != columnSchemas) { + Iterator<ColumnSchema> iterator = columnSchemas.iterator(); + while (iterator.hasNext()) { + ColumnSchema next = iterator.next(); + if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the column schema based on parent column name + * @param columName + * parent column name + * @return child column schmea + */ + public ColumnSchema getChildColumnByParentName(String columName) { + List<ColumnSchema> listOfColumns = childSchema.getListOfColumns(); + for (ColumnSchema columnSchema : listOfColumns) { + List<ParentColumnTableRelation> parentColumnTableRelations = + columnSchema.getParentColumnTableRelations(); + if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) { + return columnSchema; + } + } + return null; + } + + /** + * Below method will be used to get the child column schema based on parent name and aggregate + * function applied on column + * @param columnName + * parent column name + * @param aggFunction + * aggregate function applied + * @return child column schema + */ + public ColumnSchema getChildColByParentWithAggFun(String columnName, --- End diff -- Better name as `getAggChildColumnByParentColName` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148996693 --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.preagg; + +import java.util.List; + +/** + * class to maintain the query plan to select the data map tables + */ +public class QueryPlan { + + /** + * List of projection columns + */ + private List<QueryColumn> projectionColumn; + + /** + * list of aggregation columns + */ + private List<QueryColumn> aggregationColumns; --- End diff -- I think It is not required to separate out `aggregationColumns` , all should be part of `projectionColumn` . Just add one method `hasAggFunc` to `QueryColumn` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148997142 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -51,6 +51,7 @@ class CarbonEnv { def init(sparkSession: SparkSession): Unit = { sparkSession.udf.register("getTupleId", () => "") + sparkSession.udf.register("preAgg", () => "") --- End diff -- add comment for usage --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148997655 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -61,7 +66,7 @@ case class CreatePreAggregateTableCommand( val dbName = cm.databaseName LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(logicalPlan) --- End diff -- most of the content of class same as `CreateTableCommand`, so better call that command from here --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148997835 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -49,70 +50,109 @@ object PreAggregateUtil { def getParentCarbonTable(plan: LogicalPlan): CarbonTable = { plan match { - case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable + case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable + case Aggregate(_, _, logicalRelation: LogicalRelation) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable case _ => throw new MalformedCarbonCommandException("table does not exist") --- End diff -- It is not actually table does not exist. it is the plan doesn't match --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r148998207 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -49,70 +50,109 @@ object PreAggregateUtil { def getParentCarbonTable(plan: LogicalPlan): CarbonTable = { plan match { - case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable + case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable + case Aggregate(_, _, logicalRelation: LogicalRelation) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable case _ => throw new MalformedCarbonCommandException("table does not exist") } } /** * Below method will be used to validate the select plan * and get the required fields from select plan - * Currently only aggregate query is support any other type of query will - * fail + * Currently only aggregate query is support any other type of query will fail + * * @param plan * @param selectStmt * @return list of fields */ def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan, --- End diff -- typo `Attributes` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1464#discussion_r149001761 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -49,70 +50,109 @@ object PreAggregateUtil { def getParentCarbonTable(plan: LogicalPlan): CarbonTable = { plan match { - case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable + case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable + case Aggregate(_, _, logicalRelation: LogicalRelation) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]. + carbonRelation.metaData.carbonTable case _ => throw new MalformedCarbonCommandException("table does not exist") } } /** * Below method will be used to validate the select plan * and get the required fields from select plan - * Currently only aggregate query is support any other type of query will - * fail + * Currently only aggregate query is support any other type of query will fail + * * @param plan * @param selectStmt * @return list of fields */ def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan, selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = { - val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] plan match { - case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation - .metaData.carbonTable - val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier - .getTableName - val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier - .getDatabaseName - val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier - .getTableId - if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) { + case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) => + getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt) + case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) => + getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt) + case _ => + throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ") + } + } + + /** + * Below method will be used to get the fields from expressions + * @param groupByExp + * grouping expression + * @param aggExp + * aggregate expression + * @param logicalRelation + * logical relation + * @param selectStmt + * select statement + * @return fields from expressions + */ + def getFieldsFromPlan(groupByExp: Seq[Expression], + aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String): + scala.collection.mutable.LinkedHashMap[Field, DataMapField] = { + val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] + if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { + throw new MalformedCarbonCommandException("Un-supported table") + } + val carbonTable = logicalRelation.relation. + asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation + .metaData.carbonTable + val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getTableName + val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getDatabaseName + val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getTableId + if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) { + throw new MalformedCarbonCommandException( + "Pre Aggregation is not supported on Pre-Aggregated Table") + } + groupByExp.map { + case attr: AttributeReference => + fieldToDataMapFieldMap += getField(attr.name, + attr.dataType, + parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + case _ => + throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ --- End diff -- it is unsupported function exception --- |
Free forum by Nabble | Edit this page |