Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/414/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1682/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154662113 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -215,6 +296,49 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to extract columns from order by expression + * @param projectList + * project list from plan + * @param sortOrders + * sort order in plan + * @param carbonTable + * carbon table + * @param tableName + * table name + * @return query columns from expression + */ + def extractQueryColumnForOrderBy(projectList: Option[Seq[NamedExpression]] = None, + sortOrders: Seq[SortOrder], + carbonTable: CarbonTable, + tableName: String): Seq[QueryColumn] = { + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + if(projectList.isDefined) { + projectList.get.map { + proList => + proList.transform { + case attr: AttributeReference => + val queryColumn = getQueryColumn(attr.name, carbonTable, tableName) + if (null != queryColumn) { + list += queryColumn --- End diff -- change it to set --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154662636 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -232,7 +356,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule def getChildAttributeReference(dataMapSchema: DataMapSchema, attributeReference: AttributeReference, attributes: Seq[AttributeReference], - aggFunction: String = ""): AttributeReference = { + aggFunction: String = "", canBeNull: Boolean = false): AttributeReference = { --- End diff -- move it down --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154665732 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -328,9 +462,145 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, newChild) + // case for aggregation query with order by + case Project(_, Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) --- End diff -- Please format it properly --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1684/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154670368 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -328,9 +462,145 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, newChild) + // case for aggregation query with order by + case Project(_, Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))) + // case for handling aggregation query with filter and order by + case Project(_, Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, + Filter(updatedFilterExpression.get, newChild)))) + // case for handling aggregation with order by when only projection column exits + case Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) + // case for handling aggregation with order by and filter when only projection column exits + case Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) } } + /** + * Below method will be used to updated the maintable plan for order by query + * In case of order by we need to update project list and sort order attributes. + * + * @param aggregateExp + * child table aggregate expression + * @param sortOrders + * sort order expression in maintable plan + * @param aggDataMapSchema + * child data map schema + * @param attributes + * child attributes + * @return updated project list and updated sort order + */ + def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression], + sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema, + attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = { + val updatedProjectList = new ArrayBuffer[NamedExpression]() + // getting the updated project list from aggregate expression + aggregateExp.foreach{f => f.transform { + // for projection column + case alias@Alias(attr: AttributeReference, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + // for aggregaton column + case alias@Alias(attr: AggregateExpression, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + } + } + val updatedSortOrders = new ArrayBuffer[SortOrder]() + // getting the updated sort order + sortOrders.map { + order => order.child match { --- End diff -- move up `order =>` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154670559 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -328,9 +462,145 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, newChild) + // case for aggregation query with order by + case Project(_, Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))) + // case for handling aggregation query with filter and order by + case Project(_, Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, + Filter(updatedFilterExpression.get, newChild)))) + // case for handling aggregation with order by when only projection column exits + case Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) + // case for handling aggregation with order by and filter when only projection column exits + case Sort(sortOrders, global, Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) } } + /** + * Below method will be used to updated the maintable plan for order by query + * In case of order by we need to update project list and sort order attributes. + * + * @param aggregateExp + * child table aggregate expression + * @param sortOrders + * sort order expression in maintable plan + * @param aggDataMapSchema + * child data map schema + * @param attributes + * child attributes + * @return updated project list and updated sort order + */ + def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression], + sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema, + attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = { + val updatedProjectList = new ArrayBuffer[NamedExpression]() + // getting the updated project list from aggregate expression + aggregateExp.foreach{f => f.transform { + // for projection column + case alias@Alias(attr: AttributeReference, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + // for aggregaton column + case alias@Alias(attr: AggregateExpression, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + } + } + val updatedSortOrders = new ArrayBuffer[SortOrder]() + // getting the updated sort order + sortOrders.map { --- End diff -- take the output of map, no need of adding to `updatedSortOrders` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154671387 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -439,6 +711,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) } + /** + * Below method will be used to get the updated sort order attribute + * based on pre aggregate table + * @param sortOrderAttr + * sort order attributes reference + * @param aggregateExpressions + * aggregate expression + * @return updated sortorder attribute + */ + def getUpdatedSortOrderExpression(sortOrderAttr: AttributeReference, + aggregateExpressions: Seq[NamedExpression]): Expression = { + val updatedExpression = aggregateExpressions collectFirst { + // in case of alias we need to match with alias name and when alias is not present + // we need to compare with attribute reference name + case alias@Alias(attr: AttributeReference, name) if attr.name.equals(sortOrderAttr.name) || --- End diff -- format properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1544 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2071/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154671553 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -741,17 +1037,38 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule dataType: String = "", isChangedDataType: Boolean = false, isFilterColumn: Boolean = false): QueryColumn = { - val columnSchema = carbonTable.getColumnByName(tableName, - columnName.toLowerCase).getColumnSchema - if (isChangedDataType) { - new QueryColumn(columnSchema, columnSchema.getDataType.getName, - aggFunction.toLowerCase, isFilterColumn) + val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase) + if(null == columnSchema) { + null } else { - new QueryColumn(columnSchema, + if (isChangedDataType) { + new QueryColumn(columnSchema.getColumnSchema, + columnSchema.getDataType.getName, + aggFunction.toLowerCase, + isFilterColumn) + } else { + new QueryColumn(columnSchema.getColumnSchema, CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), aggFunction.toLowerCase, isFilterColumn) + } } } + + def createChildSelectQuery(tableSchema: TableSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { --- End diff -- move up `a =>` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1544#discussion_r154673057 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -152,22 +152,21 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = { - catalog.ParquetConversions :: - catalog.OrcConversions :: - CarbonPreInsertionCasts(sparkSession) :: - CarbonPreAggregateQueryRules(sparkSession) :: + catalog.ParquetConversions :: + catalog.OrcConversions :: + CarbonPreInsertionCasts(sparkSession) :: CarbonPreAggregateDataLoadingRules :: - CarbonIUDAnalysisRule(sparkSession) :: - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: - DataSourceAnalysis(conf) :: - (if (conf.runSQLonFile) { - new ResolveDataSource(sparkSession) :: Nil - } else { Nil } - ) + CarbonIUDAnalysisRule(sparkSession) :: + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + (if (conf.runSQLonFile) { + new ResolveDataSource(sparkSession) :: Nil + } else { Nil } + ) --- End diff -- format it properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1544 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2072/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1686/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/419/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1544 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2077/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/421/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1544 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2080/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1544 Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/422/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1544 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2081/ --- |
Free forum by Nabble | Edit this page |