Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177145374 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -349,10 +442,29 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule carbonTable, agg) isPlanUpdated = true - Aggregate(updatedGroupExp, + val updateAggPlan = Aggregate(updatedGroupExp, updatedAggExp, CarbonReflectionUtils .getSubqueryAlias(sparkSession, Some(alias), newChild, None)) + if(carbonTable.isStreamingTable) { --- End diff -- space after if --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177145511 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -405,11 +517,30 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule carbonTable, agg) isPlanUpdated = true - Aggregate(updatedGroupExp, + val updateAggPlan = Aggregate(updatedGroupExp, updatedAggExp, Filter(updatedFilterExpression.get, CarbonReflectionUtils .getSubqueryAlias(sparkSession, Some(alias), newChild, None))) + if(carbonTable.isStreamingTable) { --- End diff -- These code is duplicated 3 times --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177145680 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -466,7 +594,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule carbonTable, agg) isPlanUpdated = true - Aggregate(updatedGroupExp, + val updateAggPlan = Aggregate(updatedGroupExp, --- End diff -- move `updatedGroupExp` to next line --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3381/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2083 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4104/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4606/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177619092 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -50,9 +51,11 @@ public String getSegmentFileName() { } public static List<Segment> toSegmentList(String[] segmentIds) { + int startIndex = --- End diff -- Don't change this method, better add new method --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177619510 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java --- @@ -50,9 +51,11 @@ public String getSegmentFileName() { } public static List<Segment> toSegmentList(String[] segmentIds) { + int startIndex = + segmentIds[0].equals(CarbonCommonConstants.PREAGGQUERY_SEGMENTS_CONSTANTS) ? 1 : 0; --- End diff -- Can you describe why this check required --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177619930 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala --- @@ -0,0 +1,79 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Union} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, Ignore} + + +@Ignore +class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll { + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(CASE WHEN age=35 THEN id ELSE 0 END) from mainTable group by name") + } + + test("Test Streaming table plan with only projection column") { + val df = sql("select name from maintable group by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + test("Test Streaming table plan with only projection column and order by") { + val df = sql("select name from maintable group by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + test("Test Streaming table plan with sum aggregator") { + val df = sql("select name, sum(age) from maintable group by name order by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + test("Test Streaming table plan with sum aggregator and order by") { + val df = sql("select name, sum(age) from maintable group by name order by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + test("Test Streaming table plan with avg aggregator") { + val df = sql("select name, avg(age) from maintable group by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + test("Test Streaming table plan with expression ") { + val df = sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from maintable group by name order by name") + assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + } + + /** + * Below method will be used validate whether plan is already updated in case of streaming table + * In case of streaming table it will add UnionNode to get the data from fact and aggregate both + * as aggregate table will be updated after each handoff. + * So if plan is already updated no need to transform the plan again + * @param logicalPlan + * query plan + * @return whether need to update the query plan or not + */ + def validateStreamingTablePlan(logicalPlan: LogicalPlan) : Boolean = { + var isChildTableExists: Boolean = false + logicalPlan.transform { + case union @ Union(Seq(plan1, plan2)) => + plan2.collect{ + case logicalRelation: LogicalRelation if + logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .isChildDataMap => + isChildTableExists = false --- End diff -- I think it should be true --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177683347 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -262,12 +351,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule var isPlanUpdated = false val updatedPlan = logicalPlan.transform { case agg@Aggregate( - grExp, - aggExp, - CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))) + grExp, + aggExp, + CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))) --- End diff -- Format it properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177685409 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -493,6 +629,210 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule updatedPlan } + /** + * Method to get the aggregate query plan + * @param aggPlan + * aggregate table query plan + * @param grExp + * fact group by expression + * @param aggExp + * fact aggregate expression + * @param carbonTable + * fact table + * @param aggregationDataMapSchema + * selected aggregation data map + * @param factAggPlan + * fact aggregate query plan + * @return updated plan + */ + def getAggregateQueryPlan(aggPlan: LogicalPlan, + grExp: Seq[Expression], + aggExp: Seq[NamedExpression], + carbonTable: CarbonTable, + aggregationDataMapSchema: DataMapSchema, + factAggPlan: LogicalPlan): LogicalPlan = { + // to handle streaming table with pre aggregate + if (carbonTable.isStreamingTable) { + setSegmentsForStreaming(carbonTable, aggregationDataMapSchema) --- End diff -- There is a possibility of losing the data from the fact if you set only segments for fact and read the streaming segments directly from fact streaming segments, in case of handoff streaming segments would be marked for delete and those won't be accessible during fact read. So set all the current fact segments also here along with aggregate segments. During fact read,compare the current segments and set segments and read all intersected segments --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2083#discussion_r177979593 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -493,6 +629,210 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule updatedPlan } + /** + * Method to get the aggregate query plan + * @param aggPlan + * aggregate table query plan + * @param grExp + * fact group by expression + * @param aggExp + * fact aggregate expression + * @param carbonTable + * fact table + * @param aggregationDataMapSchema + * selected aggregation data map + * @param factAggPlan + * fact aggregate query plan + * @return updated plan + */ + def getAggregateQueryPlan(aggPlan: LogicalPlan, + grExp: Seq[Expression], + aggExp: Seq[NamedExpression], + carbonTable: CarbonTable, + aggregationDataMapSchema: DataMapSchema, + factAggPlan: LogicalPlan): LogicalPlan = { + // to handle streaming table with pre aggregate + if (carbonTable.isStreamingTable) { + setSegmentsForStreaming(carbonTable, aggregationDataMapSchema) --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3426/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4653/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2083 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4163/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on the issue:
https://github.com/apache/carbondata/pull/2083 retest this please --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3430/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4657/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2083 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4165/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2083 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4676/ --- |
Free forum by Nabble | Edit this page |