Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1542 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1389/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1542 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1399/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1542 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1844/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153068582 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -797,33 +798,59 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@Cast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) --- End diff -- Is the duplicate columns will be removed from aggexpressions? For example agg table is created with sum(col1) and avg(col1) then aggregation table should be created with sum(col1) and count(col1) only. sum(col1) should not be duplicated. Is this handled here? --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153068856 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -797,33 +798,59 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@Cast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) --- End diff -- Yes, validateAggregateFunctionAndGetAlias function returns a Seq((columnName_aggFunction, Alias)) which is added to a map to remove any duplicate Alias entries. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1542 Build Failed with Spark 1.6, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/391/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1542 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1521/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1542 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1923/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1542 LGTM --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/1542 LGTM --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/1542 @jackylk Please review and merge --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153995074 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@MatchCast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) } - aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + aggregate + .copy(aggregateExpressions = validExpressionsMap.values.toSeq) --- End diff -- move this to previous line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153995138 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@MatchCast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) } - aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + aggregate + .copy(aggregateExpressions = validExpressionsMap.values.toSeq) case plan: LogicalPlan => plan } } + + /** + * This method will split the avg column into sum and count and will return a sequence of tuple + * of unique name, alias + * + * @param alias + * @return --- End diff -- remove line 844 and line 845 --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153995458 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@MatchCast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) } - aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + aggregate + .copy(aggregateExpressions = validExpressionsMap.values.toSeq) --- End diff -- ok --- |
In reply to this post by qiuchenjian-2
|
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1542#discussion_r153995580 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@MatchCast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) } - aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + aggregate + .copy(aggregateExpressions = validExpressionsMap.values.toSeq) case plan: LogicalPlan => plan } } + + /** + * This method will split the avg column into sum and count and will return a sequence of tuple + * of unique name, alias + * + * @param alias + * @return --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1542 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1598/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1542 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1993/ --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/1542 @jackylk build passed --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1542 LGTM --- |
Free forum by Nabble | Edit this page |