Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1801/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1802/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 retest sdv please --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1819/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r152597828 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala --- @@ -95,8 +95,14 @@ object CarbonSetCommand { } } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) { sessionParams.addProperty(key.toLowerCase(), value) + } else if (key.startsWith(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL)) { --- End diff -- I don't think it is required to use `set command` for this internal call. we are not going to give the option to load the aggregate table as it may corrupt the table. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1405/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1850/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1466/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1883/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153067591 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { + val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") + val childDataFrame = Dataset.ofRows(sparkSession, new CarbonSpark2SqlParser() + .parse(s"insert into ${ tableModel.databaseName }.${ + tableModel.tableName} $queryString")) --- End diff -- why it is not just `queryString`? why insertinto required here as you are already using load command --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153067934 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + + plan transform { + case aggregate@Aggregate(_, aExp, _) => + val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad")) + if (aExp.exists(_.name.equalsIgnoreCase("PreAgg"))) { --- End diff -- move this `if` condition to the `case` block --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153067988 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -736,6 +739,45 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + + plan transform { + case aggregate@Aggregate(_, aExp, _) => + val isLoadPlan = aExp.exists(_.name.equalsIgnoreCase("preAggLoad")) --- End diff -- Even this condition also move to `case ` block --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153068009 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -55,6 +55,7 @@ class CarbonEnv { // added for handling preaggregate table creation. when user will fire create ddl for // create table we are adding a udf so no need to apply PreAggregate rules. sparkSession.udf.register("preAgg", () => "") + sparkSession.udf.register("preAggLoad", () => "") --- End diff -- Add comment about usage of this udf --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153068092 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -49,13 +50,52 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") - sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + .addPreAggLoadFunction(s"${dataMapSchema.getProperties.get("CHILD_SELECT QUERY")} ")).drop("preAggLoad") + val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") + try { + LoadTableCommand(Some(childDatabaseName), + childTableName, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).run(sparkSession) + } finally { + CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) --- End diff -- Indentation is wrong, format properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153068121 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -94,10 +95,22 @@ case class CreatePreAggregateTableCommand( dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) // updating the parent table about child table PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql( - s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") + val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) + if (availableLoads) { + val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) + .mkString(",") + val childDataFrame = Dataset.ofRows(sparkSession, new CarbonSpark2SqlParser() + .parse(s"insert into ${ tableModel.databaseName }.${ + tableModel.tableName} $queryString")) --- End diff -- Here why don't use preAggLoad UDF? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1884/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1508 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1469/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1508 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1887/ --- |
Free forum by Nabble | Edit this page |