[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3495: [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3495: [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries

GitBox
Indhumathi27 commented on a change in pull request #3495: [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries
URL: https://github.com/apache/carbondata/pull/3495#discussion_r373326636
 
 

 ##########
 File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 ##########
 @@ -830,18 +831,186 @@ object MVHelper {
    */
   def rewriteWithMVTable(rewrittenPlan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
     if (rewrittenPlan.find(_.rewritten).isDefined) {
-      val updatedDataMapTablePlan = rewrittenPlan transform {
+      var updatedDataMapTablePlan = rewrittenPlan transform {
         case s: Select =>
           MVHelper.updateDataMap(s, rewrite)
         case g: GroupBy =>
           MVHelper.updateDataMap(g, rewrite)
       }
+      if (rewrittenPlan.rolledUp) {
+        // If the rewritten query is rolled up, then rewrite the query based on the original modular
+        // plan. Make a new outputList based on original modular plan and wrap rewritten plan with
+        // select & group-by nodes with new outputList.
+
+        // For example:
+        // Given User query:
+        // SELECT timeseries(col,'day') from maintable group by timeseries(col,'day')
+        // If plan is rewritten as per 'hour' granularity of datamap1,
+        // then rewritten query will be like,
+        // SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
+        // (projectjoindate, hour)`
+        // FROM
+        // default.datamap1_table
+        // GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`
+        //
+        // Now, rewrite the rewritten plan as per the 'day' granularity
+        // SELECT timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' ) AS
+        // `UDF:timeseries(projectjoindate, day)`
+        //  FROM
+        //  (SELECT datamap2_table.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
+        //  (projectjoindate, hour)`
+        //  FROM
+        //    default.datamap2_table
+        //  GROUP BY datamap2_table.`UDF:timeseries_projectjoindate_hour`) gen_subsumer_0
+        // GROUP BY timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' )
+        rewrite.modularPlan match {
+          case select: Select =>
+            val outputList = select.outputList
+            val rolledUpOutputList = updatedDataMapTablePlan.asInstanceOf[Select].outputList
+            var finalOutputList: Seq[NamedExpression] = Seq.empty
+            val mapping = outputList zip rolledUpOutputList
+            val newSubsme = rewrite.newSubsumerName()
+
+            for ((s, d) <- mapping) {
+              var name: String = getAliasName(d)
+              s match {
+                case a@Alias(scalaUdf: ScalaUDF, aliasName) =>
+                  if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
+                    val newName = newSubsme + ".`" + name + "`"
+                    val transformedUdf = transformTimeSeriesUdf(scalaUdf, newName)
+                    finalOutputList = finalOutputList.:+(Alias(transformedUdf, aliasName)(a.exprId,
+                      a.qualifier).asInstanceOf[NamedExpression])
+                  }
+                case Alias(attr: AttributeReference, _) =>
+                  finalOutputList = finalOutputList.:+(AttributeReference(name, attr
+                    .dataType)(
+                    exprId = attr.exprId,
+                    qualifier = Some(newSubsme)))
+                case attr: AttributeReference =>
+                  finalOutputList = finalOutputList.:+(AttributeReference(name, attr
+                    .dataType)(
+                    exprId = attr.exprId,
+                    qualifier = Some(newSubsme)))
+              }
+            }
+            val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+            val tAliasMap = new collection.mutable.HashMap[Int, String]()
+
+            val sel_plan = select.copy(outputList = finalOutputList,
+              inputList = finalOutputList,
+              predicateList = Seq.empty)
+            tChildren += sel_plan
+            tAliasMap += (tChildren.indexOf(sel_plan) -> newSubsme)
+            updatedDataMapTablePlan = select.copy(outputList = finalOutputList,
 
 Review comment:
   changed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


With regards,
Apache Git Services