[GitHub] carbondata pull request #1694: [WIP]Added code to support case expression

classic Classic list List threaded Threaded
48 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158932166
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,44 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    --- End diff --
   
    I think you need to verify the select query will hit the pre-agg table, not just check it is exist.


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158932405
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -166,127 +208,160 @@ object PreAggregateUtil {
           aggFunctions: AggregateFunction,
           parentTableName: String,
           parentDatabaseName: String,
    -      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    +      parentTableId: String,
    +      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    --- End diff --
   
    what is this field? please add comment of this function


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158932436
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -166,127 +208,160 @@ object PreAggregateUtil {
           aggFunctions: AggregateFunction,
           parentTableName: String,
           parentDatabaseName: String,
    -      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    +      parentTableId: String,
    +      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
         val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
         aggFunctions match {
    -      case sum@Sum(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +      case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(attr: AttributeReference)) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          min.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          sum.prettyName)
    +      case sum@Sum(exp: Expression) =>
    +        list += getFieldForAggregateExpression(exp,
    --- End diff --
   
    move first parameter to next line


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158932905
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -126,19 +126,33 @@ object PreAggregateUtil {
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
    -          parentTableId)
    +          parentTableId,
    +          "column_" + counter)
    +        counter = counter + 1
           case attr: AttributeReference =>
    +        val columnRelation = getColumnRelation(attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +        arrayBuffer += columnRelation
             fieldToDataMapFieldMap += getField(attr.name,
               attr.dataType,
    -          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName = parentTableName,
    -          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +          columnTableRelationList = arrayBuffer.toList)
           case Alias(attr: AttributeReference, _) =>
    +        val columnRelation = getColumnRelation(attr.name,
    --- End diff --
   
    I think `getColumnRelation` is no need, you can create a new ColumnTableRelation directly here, parameter  is almost the same


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158933435
 
    --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---
    @@ -79,7 +79,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
     }
     
     case class DataMapField(var aggregateFunction: String = "",
    -    columnTableRelation: Option[ColumnTableRelation] = None) {
    +    columnTableRelationList: Option[List[ColumnTableRelation]] = None) {
    --- End diff --
   
    Use Seq instead of List


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158933775
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -166,127 +208,160 @@ object PreAggregateUtil {
           aggFunctions: AggregateFunction,
           parentTableName: String,
           parentDatabaseName: String,
    -      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    +      parentTableId: String,
    +      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
         val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
         aggFunctions match {
    -      case sum@Sum(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +      case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(attr: AttributeReference)) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          min.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          sum.prettyName)
    +      case sum@Sum(exp: Expression) =>
    +        list += getFieldForAggregateExpression(exp,
    +          sum.dataType,
    +          carbonTable,
    +          newColumnName,
    +          sum.prettyName)
    +      case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          min.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case max@Max(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          max.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          count.prettyName)
    +      case count@Count(Seq(expression: Expression)) =>
    +        list += getFieldForAggregateExpression(expression,
    +          count.dataType,
    +          carbonTable,
    +          newColumnName,
    +          count.prettyName)
    +      case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          max.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case Average(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          "sum",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -        list += getField(attr.name,
    -          attr.dataType,
    -          "count",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          min.prettyName)
    +      case min@Min(expression: Expression) =>
    +        list += getFieldForAggregateExpression(expression,
    +          min.dataType,
    +          carbonTable,
    +          newColumnName,
    +          min.prettyName)
    +      case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          "sum",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          max.prettyName)
    +      case max@Max(expression: Expression) =>
    +        list += getFieldForAggregateExpression(expression,
    +          max.dataType,
    +          carbonTable,
    +          newColumnName,
    +          max.prettyName)
    +      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          "count",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    +          carbonTable,
    +          newColumnName,
    +          "sum")
    +        list += getFieldForAggregateExpression(exp,
    +          changeDataType,
    +          carbonTable,
    +          newColumnName,
    +          "count")
    +      case avg@Average(exp: Expression) =>
    +        list += getFieldForAggregateExpression(exp,
    +          avg.dataType,
    +          carbonTable,
    +          newColumnName,
    +          "sum")
    +        list += getFieldForAggregateExpression(exp,
    +          avg.dataType,
    +          carbonTable,
    +          newColumnName,
    +          "count")
           case others@_ =>
             throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${
               others.prettyName}")
         }
       }
     
    +  /**
    +   * Below method will be used to get the field and its data map field object
    +   * for aggregate expression
    +   * @param expression
    +   *                   expression in aggregate function
    +   * @param dataType
    +   *                 data type
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param newColumnName
    +   *                      column name of aggregate table
    +   * @param aggregationName
    +   *                        aggregate function name
    +   * @return field and its metadata tuple
    +   */
    +  def getFieldForAggregateExpression(expression: Expression,
    +      dataType: DataType,
    +      carbonTable: CarbonTable,
    +      newColumnName: String,
    +      aggregationName: String): (Field, DataMapField) = {
    +    val parentColumnsName = new ArrayBuffer[String]()
    +    expression.transform {
    +      case attr: AttributeReference =>
    +        parentColumnsName += attr.name
    +        attr
    +    }
    +    val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +    parentColumnsName.foreach { name =>
    +      arrayBuffer += getColumnRelation(name,
    +        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
    +        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
    +        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
    +        carbonTable)
    +    }
    +    // if parent column relation is of size more than one that means aggregate table
    +    // column is derived from multiple column of main table
    +    // or if expression is not a instance of attribute reference
    +    // then use column name which is passed
    +    val columnName =
    +    if (parentColumnsName.size > 1 && !expression.isInstanceOf[AttributeReference]) {
    +      newColumnName
    +    } else {
    +      expression.asInstanceOf[AttributeReference].name
    +    }
    +    getField(columnName,
    --- End diff --
   
    Can you rename `getField` to `createField` since it is creating a new Field object
    Rename other similar function also, like `getFieldForAggregateExpression`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158933818
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -166,127 +208,160 @@ object PreAggregateUtil {
           aggFunctions: AggregateFunction,
           parentTableName: String,
           parentDatabaseName: String,
    -      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    +      parentTableId: String,
    +      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
         val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
         aggFunctions match {
    -      case sum@Sum(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +      case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          sum.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(attr: AttributeReference)) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          count.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          min.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          sum.prettyName)
    +      case sum@Sum(exp: Expression) =>
    +        list += getFieldForAggregateExpression(exp,
    +          sum.dataType,
    +          carbonTable,
    +          newColumnName,
    +          sum.prettyName)
    +      case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          min.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case max@Max(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          max.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          count.prettyName)
    +      case count@Count(Seq(expression: Expression)) =>
    +        list += getFieldForAggregateExpression(expression,
    +          count.dataType,
    +          carbonTable,
    +          newColumnName,
    +          count.prettyName)
    +      case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          max.prettyName,
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case Average(attr: AttributeReference) =>
    -        list += getField(attr.name,
    -          attr.dataType,
    -          "sum",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -        list += getField(attr.name,
    -          attr.dataType,
    -          "count",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          min.prettyName)
    +      case min@Min(expression: Expression) =>
    +        list += getFieldForAggregateExpression(expression,
    +          min.dataType,
    +          carbonTable,
    +          newColumnName,
    +          min.prettyName)
    +      case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          "sum",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    -        list += getField(attr.name,
    +          carbonTable,
    +          newColumnName,
    +          max.prettyName)
    +      case max@Max(expression: Expression) =>
    +        list += getFieldForAggregateExpression(expression,
    +          max.dataType,
    +          carbonTable,
    +          newColumnName,
    +          max.prettyName)
    +      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
    +        list += getFieldForAggregateExpression(exp,
               changeDataType,
    -          "count",
    -          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
    -          parentTableName,
    -          parentDatabaseName, parentTableId = parentTableId)
    +          carbonTable,
    +          newColumnName,
    +          "sum")
    +        list += getFieldForAggregateExpression(exp,
    +          changeDataType,
    +          carbonTable,
    +          newColumnName,
    +          "count")
    +      case avg@Average(exp: Expression) =>
    +        list += getFieldForAggregateExpression(exp,
    +          avg.dataType,
    +          carbonTable,
    +          newColumnName,
    +          "sum")
    +        list += getFieldForAggregateExpression(exp,
    +          avg.dataType,
    +          carbonTable,
    +          newColumnName,
    +          "count")
           case others@_ =>
             throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${
               others.prettyName}")
         }
       }
     
    +  /**
    +   * Below method will be used to get the field and its data map field object
    +   * for aggregate expression
    +   * @param expression
    +   *                   expression in aggregate function
    +   * @param dataType
    +   *                 data type
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param newColumnName
    +   *                      column name of aggregate table
    +   * @param aggregationName
    +   *                        aggregate function name
    +   * @return field and its metadata tuple
    +   */
    +  def getFieldForAggregateExpression(expression: Expression,
    --- End diff --
   
    move parameter to next line, please follow this in the future


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158933879
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -126,19 +126,33 @@ object PreAggregateUtil {
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
    -          parentTableId)
    +          parentTableId,
    +          "column_" + counter)
    +        counter = counter + 1
           case attr: AttributeReference =>
    +        val columnRelation = getColumnRelation(attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +        arrayBuffer += columnRelation
             fieldToDataMapFieldMap += getField(attr.name,
               attr.dataType,
    -          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName = parentTableName,
    -          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +          columnTableRelationList = arrayBuffer.toList)
           case Alias(attr: AttributeReference, _) =>
    +        val columnRelation = getColumnRelation(attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    --- End diff --
   
    This is not needed after changing List to Seq


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158934001
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -508,7 +583,10 @@ object PreAggregateUtil {
         val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase(
           dataMapIdentifier.table)) match {
           case Some(dataMapSchema) =>
    -        dataMapSchema.getChildSchema.getListOfColumns.asScala.sortBy(_.getSchemaOrdinal).map(
    +        val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          .filter{f =>
    --- End diff --
   
    change `f` to a meaningful variable, add space before `{`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158935184
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,44 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    --- End diff --
   
    This PR is only for create and load support for expression inside aggregate function...Verify the query result and which pre aggregate table it will it will be validated as a part of different pr


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158935305
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,44 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    +
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
    +
    +  override def beforeAll: Unit = {
    +    sql("drop table if exists mainTable")
    +    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
    +    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name")
    +    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city")
    +    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name")
    +    sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name")
    +    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
    +  }
    +
    +  test("test pre agg create table with expression 1") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
    +  }
    +
    +  test("test pre agg create table with expression 2") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 3") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 4") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
    +  }
    +
    +  test("test pre agg create table with expression 5") {
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
    +    checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
    +  }
    +
    --- End diff --
   
    Filter Column part of group by column testcases already present in TestPreAggregateTableSelection


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158948738
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -126,19 +126,33 @@ object PreAggregateUtil {
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
    -          parentTableId)
    +          parentTableId,
    +          "column_" + counter)
    +        counter = counter + 1
           case attr: AttributeReference =>
    +        val columnRelation = getColumnRelation(attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +        arrayBuffer += columnRelation
             fieldToDataMapFieldMap += getField(attr.name,
               attr.dataType,
    -          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName = parentTableName,
    -          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +          columnTableRelationList = arrayBuffer.toList)
           case Alias(attr: AttributeReference, _) =>
    +        val columnRelation = getColumnRelation(attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158948765
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -166,127 +208,160 @@ object PreAggregateUtil {
           aggFunctions: AggregateFunction,
           parentTableName: String,
           parentDatabaseName: String,
    -      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    +      parentTableId: String,
    +      newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r158948779
 
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---
    @@ -0,0 +1,44 @@
    +package org.apache.carbondata.integration.spark.testsuite.preaggregate
    --- End diff --
   
    fixed


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to suppor...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1694
 
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2414/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to suppor...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1694
 
    Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1193/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to suppor...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1694
 
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2609/



---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata issue #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to suppor...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/1694
 
    retest this please


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r159032529
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -116,29 +115,49 @@ object PreAggregateUtil {
           throw new MalformedCarbonCommandException(
             "Pre Aggregation is not supported on Pre-Aggregated Table")
         }
    +    var counter = 0
         aggExp.map {
    -      case Alias(attr: AggregateExpression, _) =>
    +      case Alias(attr: AggregateExpression, name) =>
             if (attr.isDistinct) {
               throw new MalformedCarbonCommandException(
                 "Distinct is not supported On Pre Aggregation")
             }
    -        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
    +        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(
    +          carbonTable,
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
    -          parentTableId)
    +          parentTableId,
    +          "column_" + counter)
    +        counter = counter + 1
           case attr: AttributeReference =>
    -        fieldToDataMapFieldMap += getField(attr.name,
    +        val columnRelation = getColumnRelation(
    +          attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +        arrayBuffer += columnRelation
    +        fieldToDataMapFieldMap += createField(
    +          attr.name,
               attr.dataType,
    -          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName = parentTableName,
    -          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +          columnTableRelationList = arrayBuffer.toList)
    --- End diff --
   
    change to `columnTableRelationList = Seq(columnRelation)`


---
Reply | Threaded
Open this post in threaded view
|

[GitHub] carbondata pull request #1694: [CARBONDATA-1925][Pre-Aggregate]Added code to...

qiuchenjian-2
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1694#discussion_r159032604
 
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
    @@ -116,29 +115,49 @@ object PreAggregateUtil {
           throw new MalformedCarbonCommandException(
             "Pre Aggregation is not supported on Pre-Aggregated Table")
         }
    +    var counter = 0
         aggExp.map {
    -      case Alias(attr: AggregateExpression, _) =>
    +      case Alias(attr: AggregateExpression, name) =>
             if (attr.isDistinct) {
               throw new MalformedCarbonCommandException(
                 "Distinct is not supported On Pre Aggregation")
             }
    -        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
    +        fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(
    +          carbonTable,
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
    -          parentTableId)
    +          parentTableId,
    +          "column_" + counter)
    +        counter = counter + 1
           case attr: AttributeReference =>
    -        fieldToDataMapFieldMap += getField(attr.name,
    +        val columnRelation = getColumnRelation(
    +          attr.name,
    +          parentTableId,
    +          parentTableName,
    +          parentDatabaseName,
    +          carbonTable)
    +        val arrayBuffer = new ArrayBuffer[ColumnTableRelation]()
    +        arrayBuffer += columnRelation
    +        fieldToDataMapFieldMap += createField(
    +          attr.name,
               attr.dataType,
    -          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
               parentTableName = parentTableName,
    -          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
    +          columnTableRelationList = arrayBuffer.toList)
           case Alias(attr: AttributeReference, _) =>
    --- End diff --
   
    this case is the same as previous one, can be merged


---
123