[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

classic Classic list List threaded Threaded
74 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155279540
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -493,4 +492,102 @@ object PreAggregateUtil {
         updatedPlan
       }
     
    +  /**
    +   * Below method will be used to get the select query when rollup policy is
    +   * applied in case of timeseries table
    +   * @param tableSchema
    +   *                    main data map schema
    +   * @param selectedDataMapSchema
    +   *                              selected data map schema for rollup
    +   * @return select query based on rolloup
    +   */
    +  def createTimeseriesSelectQueryForRollup(
    +      tableSchema: TableSchema,
    +      selectedDataMapSchema: AggregationDataMapSchema): String = {
    +    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
    +    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
    +    tableSchema.getListOfColumns.asScala.foreach {
    +      a => if (a.getAggFunction.nonEmpty) {
    +        aggregateColumns += s"${a.getAggFunction match {
    +          case "count" => "sum"
    +          case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent(
    +          a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})"
    +      } else if (a.getTimeSeriesFunction.nonEmpty) {
    +        groupingExpressions += s"timeseries(${
    +          selectedDataMapSchema
    +            .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
    +              get(0).getColumnName).getColumnName
    +        } , '${ a.getTimeSeriesFunction }')"
    +      } else {
    +        groupingExpressions += selectedDataMapSchema
    +          .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
    +            get(0).getColumnName).getColumnName
    +      }
    +    }
    +    s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
    +    } from ${selectedDataMapSchema.getChildSchema.getTableName } " +
    +    s"group by ${ groupingExpressions.mkString(",") }"
    +  }
    +
    +  /**
    +   * Below method will be used to creating select query for timeseries
    +   * for lowest level for aggergation like second level, in that case it will
    +   * hit the maintable
    +   * @param tableSchema
    +   *                    data map schema
    +   * @param parentTableName
    +   *                        parent schema
    +   * @return select query for loading
    +   */
    +  def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
    +      parentTableName: String): String = {
    +    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
    +    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
    +    tableSchema.getListOfColumns.asScala.foreach {
    +      a =>
    +        if (a.getAggFunction.nonEmpty) {
    +          aggregateColumns +=
    +          s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })"
    +        } else if (a.getTimeSeriesFunction.nonEmpty) {
    +          groupingExpressions +=
    +          s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${
    +            a
    +              .getTimeSeriesFunction
    +          }')"
    +        } else {
    +          groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName
    +        }
    +    }
    +    s"select ${ groupingExpressions.mkString(",") },${
    +      aggregateColumns.mkString(",")
    +    } from ${ parentTableName } group by ${ groupingExpressions.mkString(",") }"
    +
    +  }
    +    /**
    +   * Below method will be used to select rollup table in case of
    +   * timeseries data map loading
    +   * @param list
    +   *             list of timeseries datamap
    +   * @param dataMapSchema
    +   *                      datamap schema
    +   * @return select table name
    +   */
    +  def getRollupDataMapNameForTimeSeries(
    +      list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema],
    +      dataMapSchema: AggregationDataMapSchema): Option[AggregationDataMapSchema] = {
    +    if (list.isEmpty) {
    +      None
    +    } else {
    +      val rollupDataMapSchema = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
    +      list.foreach{f =>
    +        if (dataMapSchema.canSelectForRollup(f)) {
    +          rollupDataMapSchema += f
    +        } }
    +      if(rollupDataMapSchema.isEmpty) {
    --- End diff --
   
    use `lastOption` instead of if else


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155279800
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUDF.scala ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.command.timeseries
    +
    +import java.sql.Timestamp
    +
    +import org.apache.carbondata.core.preagg.TimeSeriesUDF
    +
    +/**
    + * Time series udf class
    + */
    +
    +class TimeseriesUDf extends Function2[Timestamp, String, Timestamp] with Serializable{
    --- End diff --
   
    Please rename this class to avoid duplicate names


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155282425
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to extract the query columns from
    +   * filter expression
    +   * @param filterExp
    +   *                  filter expression
    +   * @param set
    +   *             query column list
    +   * @param carbonTable
    +   *                    parent table
    +   * @param tableName
    +   *                  table name
    +   * @return isvalid filter expression for aggregate
    +   */
    +  def extractQueryColumnFromFilterExp(filterExp: Expression,
    +      set: scala.collection.mutable.HashSet[QueryColumn],
    +      carbonTable: CarbonTable, tableName: String): Boolean = {
    +    val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String]
    --- End diff --
   
    rename to map


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155282532
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to extract the query columns from
    +   * filter expression
    +   * @param filterExp
    +   *                  filter expression
    +   * @param set
    +   *             query column list
    +   * @param carbonTable
    +   *                    parent table
    +   * @param tableName
    +   *                  table name
    +   * @return isvalid filter expression for aggregate
    +   */
    +  def extractQueryColumnFromFilterExp(filterExp: Expression,
    +      set: scala.collection.mutable.HashSet[QueryColumn],
    +      carbonTable: CarbonTable, tableName: String): Boolean = {
    +    val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String]
    +    var isValidPlan = true
    +    filterExp.transform {
    +      case attr: AttributeReference =>
    +        if (!newFilterList.get(attr).isDefined) {
    +          newFilterList.put(attr, null)
    +        }
    +        attr
    +      case udf@ScalaUDF(_, _, _, _) =>
    +        if (udf.function.getClass.getName
    +          .equalsIgnoreCase("org.apache.spark.sql.execution.command.timeseries.TimeseriesUDf") &&
    +            carbonTable.hasTimeSeriesDataMap) {
    +          newFilterList.put(udf.children(0).asInstanceOf[AttributeReference],
    +            udf.children(1).asInstanceOf[Literal].value.toString)
    +        } else {
    +          udf.transform {
    +            case attr: AttributeReference =>
    +              if (!newFilterList.get(attr).isDefined) {
    +                newFilterList.put(attr, null)
    +              }
    +              attr
    +          }
    +        }
    +        udf
    +    }
    +    newFilterList.foreach {
    +      f =>
    --- End diff --
   
    format properly


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155282655
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to extract the query columns from
    +   * filter expression
    +   * @param filterExp
    +   *                  filter expression
    +   * @param set
    +   *             query column list
    +   * @param carbonTable
    +   *                    parent table
    +   * @param tableName
    +   *                  table name
    +   * @return isvalid filter expression for aggregate
    +   */
    +  def extractQueryColumnFromFilterExp(filterExp: Expression,
    +      set: scala.collection.mutable.HashSet[QueryColumn],
    +      carbonTable: CarbonTable, tableName: String): Boolean = {
    +    val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String]
    --- End diff --
   
    Add comment why is this required


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155283460
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---
    @@ -725,6 +786,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             Alias(aggExp,
               name)(NamedExpression.newExprId,
               alias.qualifier).asInstanceOf[NamedExpression]
    +      case alias@Alias(expression: Expression, name) =>
    +        val updatedExp =
    +          if (expression.isInstanceOf[ScalaUDF] &&
    +              expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
    +                "org.apache.spark.sql.execution.command.timeseries.TimeseriesUDf")) {
    +          expression.asInstanceOf[ScalaUDF].transform {
    +            case attr: AttributeReference =>
    +            val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
    +              attr,
    +              attributes,
    +              timeseriesFunction =
    --- End diff --
   
    remove it


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/525/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/529/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2150/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1778/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/535/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1786/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2157/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2160/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2167/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/542/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155429834
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---
    @@ -210,4 +291,45 @@ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
         }
       }
     
    +  public boolean isTimeseriesDataMap() {
    +    return isTimeseriesDataMap;
    +  }
    +
    +  /**
    +   * Below method is to support rollup during loading the data in pre aggregate table
    +   * In case of timeseries year level table data loading can be done using month level table or any
    +   * time series level below year level for example day,hour minute, second.
    +   * @TODO need to handle for pre aggregate table without timeseries
    +   *
    +   * @param aggregationDataMapSchema
    +   * @return whether aggregation data map can be selected or not
    +   */
    +  public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) {
    +    List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
    --- End diff --
   
    handled in query preparation


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support Query a...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1626
 
    Add PR description


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155437464
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -719,4 +719,34 @@ public long size() throws IOException {
         }
         return dataSize + indexSize;
       }
    +
    +  /**
    +   * Utility function to check whether table has timseries datamap or not
    +   * @param carbonTable
    +   * @return timeseries data map present
    +   */
    +  public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) {
    --- End diff --
   
    Please move them to utility


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1626: [CARBONDATA-1519][PreAgg-Timeseries] Support ...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1626#discussion_r155437483
 
    --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---
    @@ -719,4 +719,34 @@ public long size() throws IOException {
         }
         return dataSize + indexSize;
       }
    +
    +  /**
    +   * Utility function to check whether table has timseries datamap or not
    +   * @param carbonTable
    +   * @return timeseries data map present
    +   */
    +  public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) {
    +    List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList();
    +    for (DataMapSchema dataMapSchema : dataMapSchemaList) {
    +      if (dataMapSchema instanceof AggregationDataMapSchema) {
    +        return ((AggregationDataMapSchema) dataMapSchema).isTimeseriesDataMap();
    +      }
    +    }
    +    return false;
    +  }
    +
    +  /**
    +   * Utility function to check whether table has timseries datamap or not
    +   * @param carbonTable
    +   * @return timeseries data map present
    +   */
    +  public static boolean hasAggregationDataMap(CarbonTable carbonTable) {
    --- End diff --
   
    Please move to utility


---
1234