Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1477/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1479/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1896/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1482/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1900/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1485/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1903/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1498/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1915/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 1.6, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/392/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1522/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1924/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153722006 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala --- @@ -221,7 +221,6 @@ object DataLoadingUtil { ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat") ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat") ValidateUtil.validateSortScope(table, sort_scope) - --- End diff -- don 't change file unnecessarly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153722623 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { + val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) --- End diff -- Move line done --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153722722 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,21 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { + val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() --- End diff -- Move down after ( --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153722923 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") - sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")) + .drop("preAggLoad") + val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) --- End diff -- move down --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153723385 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") - sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")) + .drop("preAggLoad") + val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") + try { + LoadTableCommand(Some(childDatabaseName), + childTableName, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")) + .run(sparkSession) + } finally { + CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + } } } } } +object LoadPreAggregateTablePreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent] + val carbonLoadModel = loadEvent.carbonLoadModel + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val isInternalLoadCall = carbonLoadModel.isAggLoadRequest + if (table.isChildDataMap && !isInternalLoadCall) { + throw new UnsupportedOperationException( + "Cannot insert/load data directly into pre-aggregate table") + } + --- End diff -- remove line --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153723567 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -253,8 +260,8 @@ object PreAggregateUtil { carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName, parentDatabaseName, parentTableId = parentTableId) - case _ => - throw new MalformedCarbonCommandException("Un-Supported Aggregation Type") + case a@_ => --- End diff -- Keep as case others --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153723895 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + + plan transform { + + case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => + val newExpressions = aExp.flatMap { + case alias@Alias(attrExpression: AggregateExpression, _) => + attrExpression.aggregateFunction match { + case Average(attr: AttributeReference) => + Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(attr), + resultId = NamedExpression.newExprId), attr.name + "_sum")(), + Alias(attrExpression + .copy(aggregateFunction = Count(attr), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case Average(cast@Cast(attr: AttributeReference, _)) => + Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(cast), + resultId = NamedExpression.newExprId), + attr.name + "_sum")(), + Alias(attrExpression + .copy(aggregateFunction = Count(cast), + resultId = NamedExpression.newExprId), attr.name + "_count")()) --- End diff -- Please format it properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153724030 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -751,6 +754,58 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + + plan transform { + + case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => + val newExpressions = aExp.flatMap { + case alias@Alias(attrExpression: AggregateExpression, _) => + attrExpression.aggregateFunction match { + case Average(attr: AttributeReference) => + Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(attr), + resultId = NamedExpression.newExprId), attr.name + "_sum")(), + Alias(attrExpression + .copy(aggregateFunction = Count(attr), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case Average(cast@Cast(attr: AttributeReference, _)) => + Seq(Alias(attrExpression + .copy(aggregateFunction = Sum(cast), + resultId = NamedExpression.newExprId), + attr.name + "_sum")(), + Alias(attrExpression + .copy(aggregateFunction = Count(cast), + resultId = NamedExpression.newExprId), attr.name + "_count")()) + case _ => Seq(alias) + } + case namedExpr: NamedExpression => Seq(namedExpr) + } + aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + case plan: LogicalPlan => plan + } + } + + /** + * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not. + * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is + * valid. + * + * @param namedExpression + * @return + */ + private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = { + val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias]) + filteredExpressions + .exists { + expr => !expr.name.equalsIgnoreCase("PreAgg") && + expr.name.equalsIgnoreCase("preAggLoad") --- End diff -- format properly --- |
Free forum by Nabble | Edit this page |