[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

classic Classic list List threaded Threaded
74 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
GitHub user kumarvishal09 opened a pull request:

    https://github.com/apache/carbondata/pull/1626

    [CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table

    Support Loading and query on time series pre aggregate table
   
     - [ ] Any interfaces changed?
     No
     - [ ] Any backward compatibility impacted?
     No
     - [ ] Document update required?
    No
     - [ ] Testing done
         Added UT test cases for timeseries table selection and data loading on timeseries table + validation timeseries table data
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kumarvishal09/incubator-carbondata Branch_master_Timeseries_5-12_Query

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1626.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1626
   
----
commit 31ff61fb36336d3cbbf518cde6716894eed17638
Author: kumarvishal <[hidden email]>
Date:   2017-12-05T10:30:48Z

    added support for time series create table

commit 9e72145b862553ad34ddced5bfe1282e23561fdd
Author: kumarvishal <[hidden email]>
Date:   2017-12-05T15:26:58Z

    Added support for timeseries query

----


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/515/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155263571
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
           Iterator<ColumnSchema> iterator = columnSchemas.iterator();
           while (iterator.hasNext()) {
             ColumnSchema next = iterator.next();
    -        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
    +            .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    +   * @param columnName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getNonAggNonTimeChildColBasedByParent(String columnName) {
    --- End diff --
   
    Please rename method to `getNonAggTimeChildColBasedByParent`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155263690
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
           Iterator<ColumnSchema> iterator = columnSchemas.iterator();
           while (iterator.hasNext()) {
             ColumnSchema next = iterator.next();
    -        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
    --- End diff --
   
    please rename method to `getNonAggNonTimeChildColBasedByParent`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155264012
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
           Iterator<ColumnSchema> iterator = columnSchemas.iterator();
           while (iterator.hasNext()) {
             ColumnSchema next = iterator.next();
    -        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
    +            .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    --- End diff --
   
    change the description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155264101
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
           Iterator<ColumnSchema> iterator = columnSchemas.iterator();
           while (iterator.hasNext()) {
             ColumnSchema next = iterator.next();
    -        if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
    +        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next
    +            .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    +   * @param columnName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getNonAggNonTimeChildColBasedByParent(String columnName) {
    +    Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
    +    if (null != columnSchemas) {
    +      Iterator<ColumnSchema> iterator = columnSchemas.iterator();
    +      while (iterator.hasNext()) {
    +        ColumnSchema next = iterator.next();
    +        if ((null == next.getAggFunction() || next.getAggFunction().isEmpty())) {
    +          return next;
    +        }
    +      }
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * Below method will be used to get the columns on which aggregate function is not applied
    --- End diff --
   
    change description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155265299
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -125,6 +179,25 @@ public ColumnSchema getAggChildColByParent(String columnName,
         return null;
       }
     
    +  /**
    +   * Below method will be used to get the column schema based on parent column name
    +   * @param columName
    +   *                parent column name
    +   * @return child column schema
    +   */
    +  public ColumnSchema getTimeseriesChildColByParent(String columName, String timeseriesFunction) {
    --- End diff --
   
    I think it is duplicated to `getChildColByParentColName`, please remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155267822
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -210,4 +291,45 @@ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
         }
       }
     
    +  public boolean isTimeseriesDataMap() {
    +    return isTimeseriesDataMap;
    +  }
    +
    +  /**
    +   * Below method is to support rollup during loading the data in pre aggregate table
    +   * In case of timeseries year level table data loading can be done using month level table or any
    +   * time series level below year level for example day,hour minute, second.
    +   * @TODO need to handle for pre aggregate table without timeseries
    +   *
    +   * @param aggregationDataMapSchema
    +   * @return whether aggregation data map can be selected or not
    +   */
    +  public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) {
    +    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
    --- End diff --
   
    please handle `dummy-measure` scenario


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155270004
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -152,6 +154,15 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
         }
         table.hasDataMapSchema =
             null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
    +    List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList();
    +    for (DataMapSchema dataMapSchema : dataMapSchemaList) {
    +      if (dataMapSchema instanceof AggregationDataMapSchema) {
    +        if (!table.hasTimeSeriesDataMap) {
    --- End diff --
   
    Please don't change the carbonTable for timeseriesdatamap. Add an utility function to decide it has timeseries datamap or not


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155270121
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -152,6 +154,15 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
         }
         table.hasDataMapSchema =
    --- End diff --
   
    Please don't change the carbonTable for aggdatamap. Add a utility function to decide it has agg datamap or not


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155271321
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java ---
    @@ -70,8 +70,15 @@ public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
             AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
             isMatch = true;
             for (QueryColumn queryColumn : projectionColumn) {
    -          ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
    -              .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
    +          ColumnSchema columnSchemaByParentName = null;
    --- End diff --
   
    Please refactor to remove duplicate code


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155271792
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java ---
    @@ -21,20 +21,46 @@
      * enum for timeseries function
      */
     public enum TimeSeriesFunction {
    -  SECOND("second"),
    -  MINUTE("minute"),
    -  HOUR("hour"),
    -  DAY("day"),
    -  MONTH("month"),
    -  YEAR("year");
    +  SECOND("second", 0),
    +  MINUTE("minute", 1),
    +  HOUR("hour", 2),
    +  DAY("day", 3),
    +  MONTH("month", 4),
    +  YEAR("year", 5);
     
       private String name;
     
    -  TimeSeriesFunction(String name) {
    +  private int ordinal;
    +
    +  TimeSeriesFunction(String name, int ordinal) {
         this.name = name;
    +    this.ordinal = ordinal;
       }
     
       public String getName() {
         return name;
       }
    +
    +  public int getOrdinal() {
    +    return ordinal;
    +  }
    +
    +//   public static TimeSeriesFunction valueOf(String name) {
    --- End diff --
   
    remove commented code


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155272388
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.integration.spark.testsuite.timeseries
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.Row
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime")
    +    sql(s"LOAD DATA LOCAL INPATH 'D:/mydata.csv' into table mainTable")
    --- End diff --
   
    change path


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155275089
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -38,8 +38,20 @@ object LoadPostAggregateListener extends OperationEventListener {
         val sparkSession = loadEvent.sparkSession
         val carbonLoadModel = loadEvent.carbonLoadModel
         val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    //    val parentTableName = table.getTableName
         if (table.hasDataMapSchema) {
    -      for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
    +      // getting all the aggergate datamap schema
    +      val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
    --- End diff --
   
    simplify `table.getTableInfo.getDataMapSchemaList.asScala.filter(_.isInstanceOf[AggregationDataMapSchema]).map(_.asInstanceOf)`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155276144
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -65,6 +95,10 @@ object LoadPostAggregateListener extends OperationEventListener {
                 dataFrame = Some(childDataFrame),
                 internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
                 run(sparkSession)
    +          if (dataMapSchema.isInstanceOf[AggregationDataMapSchema] &&
    --- End diff --
   
    remove this check


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155276423
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -51,8 +63,26 @@ object LoadPostAggregateListener extends OperationEventListener {
               carbonLoadModel.getTableName, "false")
             val childTableName = dataMapSchema.getRelationIdentifier.getTableName
             val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
    -        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
    -          s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
    +        val childDataFrame =
    +          if (!dataMapSchema.asInstanceOf[AggregationDataMapSchema].isTimeseriesDataMap) {
    --- End diff --
   
    no need to use asInstanceOf


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155277594
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -51,8 +63,26 @@ object LoadPostAggregateListener extends OperationEventListener {
               carbonLoadModel.getTableName, "false")
             val childTableName = dataMapSchema.getRelationIdentifier.getTableName
             val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
    -        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
    -          s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
    +        val childDataFrame =
    +          if (!dataMapSchema.asInstanceOf[AggregationDataMapSchema].isTimeseriesDataMap) {
    +            sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
    +              s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
    +          } else {
    +            // for timeseries rollup policy
    +            val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
    +              dataMapSchema.asInstanceOf[AggregationDataMapSchema])
    +            // if non of the rollup data map is selected hit the maintable and prepare query
    +            val childQuery = if (!tableSelectedForRollup.isDefined) {
    --- End diff --
   
    use isEmpty


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155278666
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +492,102 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  /**
    +   * Below method will be used to get the select query when rollup policy is
    +   * applied in case of timeseries table
    +   * @param tableSchema
    +   *                    main data map schema
    +   * @param selectedDataMapSchema
    +   *                              selected data map schema for rollup
    +   * @return select query based on rolloup
    +   */
    +  def createTimeseriesSelectQueryForRollup(
    +      tableSchema: TableSchema,
    +      selectedDataMapSchema: AggregationDataMapSchema): String = {
    +    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
    +    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
    +    tableSchema.getListOfColumns.asScala.foreach {
    +      a => if (a.getAggFunction.nonEmpty) {
    --- End diff --
   
    move up a =>


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155279264
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +492,102 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  /**
    +   * Below method will be used to get the select query when rollup policy is
    +   * applied in case of timeseries table
    +   * @param tableSchema
    +   *                    main data map schema
    +   * @param selectedDataMapSchema
    +   *                              selected data map schema for rollup
    +   * @return select query based on rolloup
    +   */
    +  def createTimeseriesSelectQueryForRollup(
    --- End diff --
   
    After compaction PR is merged , this duplicate function should be merged to there.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155279382
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +492,102 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  /**
    +   * Below method will be used to get the select query when rollup policy is
    +   * applied in case of timeseries table
    +   * @param tableSchema
    +   *                    main data map schema
    +   * @param selectedDataMapSchema
    +   *                              selected data map schema for rollup
    +   * @return select query based on rolloup
    +   */
    +  def createTimeseriesSelectQueryForRollup(
    +      tableSchema: TableSchema,
    +      selectedDataMapSchema: AggregationDataMapSchema): String = {
    +    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
    +    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
    +    tableSchema.getListOfColumns.asScala.foreach {
    +      a => if (a.getAggFunction.nonEmpty) {
    +        aggregateColumns += s"${a.getAggFunction match {
    +          case "count" => "sum"
    +          case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent(
    +          a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})"
    +      } else if (a.getTimeSeriesFunction.nonEmpty) {
    +        groupingExpressions += s"timeseries(${
    +          selectedDataMapSchema
    +            .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
    +              get(0).getColumnName).getColumnName
    +        } , '${ a.getTimeSeriesFunction }')"
    +      } else {
    +        groupingExpressions += selectedDataMapSchema
    +          .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
    +            get(0).getColumnName).getColumnName
    +      }
    +    }
    +    s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
    +    } from ${selectedDataMapSchema.getChildSchema.getTableName } " +
    +    s"group by ${ groupingExpressions.mkString(",") }"
    +  }
    +
    +  /**
    +   * Below method will be used to creating select query for timeseries
    +   * for lowest level for aggergation like second level, in that case it will
    +   * hit the maintable
    +   * @param tableSchema
    +   *                    data map schema
    +   * @param parentTableName
    +   *                        parent schema
    +   * @return select query for loading
    +   */
    +  def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
    +      parentTableName: String): String = {
    +    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
    +    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
    +    tableSchema.getListOfColumns.asScala.foreach {
    +      a =>
    +        if (a.getAggFunction.nonEmpty) {
    +          aggregateColumns +=
    +          s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })"
    +        } else if (a.getTimeSeriesFunction.nonEmpty) {
    +          groupingExpressions +=
    +          s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${
    +            a
    +              .getTimeSeriesFunction
    +          }')"
    --- End diff --
   
    format properly


---
1234