Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158932166 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,44 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") --- End diff -- I think you need to verify the select query will hit the pre-agg table, not just check it is exist. --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158932405 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -166,127 +208,160 @@ object PreAggregateUtil { aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { --- End diff -- what is this field? please add comment of this function --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158932436 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -166,127 +208,160 @@ object PreAggregateUtil { aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)] aggFunctions match { - case sum@Sum(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(attr: AttributeReference)) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(Cast(attr: AttributeReference, _))) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + sum.prettyName) + case sum@Sum(exp: Expression) => + list += getFieldForAggregateExpression(exp, --- End diff -- move first parameter to next line --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158932905 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -126,19 +126,33 @@ object PreAggregateUtil { attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => + val columnRelation = getColumnRelation(attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + arrayBuffer += columnRelation fieldToDataMapFieldMap += getField(attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = arrayBuffer.toList) case Alias(attr: AttributeReference, _) => + val columnRelation = getColumnRelation(attr.name, --- End diff -- I think `getColumnRelation` is no need, you can create a new ColumnTableRelation directly here, parameter is almost the same --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158933435 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -79,7 +79,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri } case class DataMapField(var aggregateFunction: String = "", - columnTableRelation: Option[ColumnTableRelation] = None) { + columnTableRelationList: Option[List[ColumnTableRelation]] = None) { --- End diff -- Use Seq instead of List --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158933775 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -166,127 +208,160 @@ object PreAggregateUtil { aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)] aggFunctions match { - case sum@Sum(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(attr: AttributeReference)) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(Cast(attr: AttributeReference, _))) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + sum.prettyName) + case sum@Sum(exp: Expression) => + list += getFieldForAggregateExpression(exp, + sum.dataType, + carbonTable, + newColumnName, + sum.prettyName) + case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) => + list += getFieldForAggregateExpression(exp, changeDataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + count.prettyName) + case count@Count(Seq(expression: Expression)) => + list += getFieldForAggregateExpression(expression, + count.dataType, + carbonTable, + newColumnName, + count.prettyName) + case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, - attr.dataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + min.prettyName) + case min@Min(expression: Expression) => + list += getFieldForAggregateExpression(expression, + min.dataType, + carbonTable, + newColumnName, + min.prettyName) + case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, + carbonTable, + newColumnName, + max.prettyName) + case max@Max(expression: Expression) => + list += getFieldForAggregateExpression(expression, + max.dataType, + carbonTable, + newColumnName, + max.prettyName) + case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) + carbonTable, + newColumnName, + "sum") + list += getFieldForAggregateExpression(exp, + changeDataType, + carbonTable, + newColumnName, + "count") + case avg@Average(exp: Expression) => + list += getFieldForAggregateExpression(exp, + avg.dataType, + carbonTable, + newColumnName, + "sum") + list += getFieldForAggregateExpression(exp, + avg.dataType, + carbonTable, + newColumnName, + "count") case others@_ => throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${ others.prettyName}") } } + /** + * Below method will be used to get the field and its data map field object + * for aggregate expression + * @param expression + * expression in aggregate function + * @param dataType + * data type + * @param carbonTable + * parent carbon table + * @param newColumnName + * column name of aggregate table + * @param aggregationName + * aggregate function name + * @return field and its metadata tuple + */ + def getFieldForAggregateExpression(expression: Expression, + dataType: DataType, + carbonTable: CarbonTable, + newColumnName: String, + aggregationName: String): (Field, DataMapField) = { + val parentColumnsName = new ArrayBuffer[String]() + expression.transform { + case attr: AttributeReference => + parentColumnsName += attr.name + attr + } + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + parentColumnsName.foreach { name => + arrayBuffer += getColumnRelation(name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + } + // if parent column relation is of size more than one that means aggregate table + // column is derived from multiple column of main table + // or if expression is not a instance of attribute reference + // then use column name which is passed + val columnName = + if (parentColumnsName.size > 1 && !expression.isInstanceOf[AttributeReference]) { + newColumnName + } else { + expression.asInstanceOf[AttributeReference].name + } + getField(columnName, --- End diff -- Can you rename `getField` to `createField` since it is creating a new Field object Rename other similar function also, like `getFieldForAggregateExpression` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158933818 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -166,127 +208,160 @@ object PreAggregateUtil { aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)] aggFunctions match { - case sum@Sum(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(attr: AttributeReference)) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(Cast(attr: AttributeReference, _))) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + sum.prettyName) + case sum@Sum(exp: Expression) => + list += getFieldForAggregateExpression(exp, + sum.dataType, + carbonTable, + newColumnName, + sum.prettyName) + case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) => + list += getFieldForAggregateExpression(exp, changeDataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + count.prettyName) + case count@Count(Seq(expression: Expression)) => + list += getFieldForAggregateExpression(expression, + count.dataType, + carbonTable, + newColumnName, + count.prettyName) + case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, - attr.dataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + min.prettyName) + case min@Min(expression: Expression) => + list += getFieldForAggregateExpression(expression, + min.dataType, + carbonTable, + newColumnName, + min.prettyName) + case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, + carbonTable, + newColumnName, + max.prettyName) + case max@Max(expression: Expression) => + list += getFieldForAggregateExpression(expression, + max.dataType, + carbonTable, + newColumnName, + max.prettyName) + case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += getFieldForAggregateExpression(exp, changeDataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) + carbonTable, + newColumnName, + "sum") + list += getFieldForAggregateExpression(exp, + changeDataType, + carbonTable, + newColumnName, + "count") + case avg@Average(exp: Expression) => + list += getFieldForAggregateExpression(exp, + avg.dataType, + carbonTable, + newColumnName, + "sum") + list += getFieldForAggregateExpression(exp, + avg.dataType, + carbonTable, + newColumnName, + "count") case others@_ => throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${ others.prettyName}") } } + /** + * Below method will be used to get the field and its data map field object + * for aggregate expression + * @param expression + * expression in aggregate function + * @param dataType + * data type + * @param carbonTable + * parent carbon table + * @param newColumnName + * column name of aggregate table + * @param aggregationName + * aggregate function name + * @return field and its metadata tuple + */ + def getFieldForAggregateExpression(expression: Expression, --- End diff -- move parameter to next line, please follow this in the future --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158933879 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -126,19 +126,33 @@ object PreAggregateUtil { attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => + val columnRelation = getColumnRelation(attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + arrayBuffer += columnRelation fieldToDataMapFieldMap += getField(attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = arrayBuffer.toList) case Alias(attr: AttributeReference, _) => + val columnRelation = getColumnRelation(attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() --- End diff -- This is not needed after changing List to Seq --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158934001 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -508,7 +583,10 @@ object PreAggregateUtil { val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase( dataMapIdentifier.table)) match { case Some(dataMapSchema) => - dataMapSchema.getChildSchema.getListOfColumns.asScala.sortBy(_.getSchemaOrdinal).map( + val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala + .filter{f => --- End diff -- change `f` to a meaningful variable, add space before `{` --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158935184 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,44 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") --- End diff -- This PR is only for create and load support for expression inside aggregate function...Verify the query result and which pre aggregate table it will it will be validated as a part of different pr --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158935305 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,44 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,count(age) from mainTable group by name") + sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end) from mainTable group by name") + sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(case when age=35 then id else 0 end),city from mainTable group by name,city") + sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end) from mainTable group by name") + sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + } + + test("test pre agg create table with expression 1") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 5") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum") + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum") + } + --- End diff -- Filter Column part of group by column testcases already present in TestPreAggregateTableSelection --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158948738 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -126,19 +126,33 @@ object PreAggregateUtil { attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => + val columnRelation = getColumnRelation(attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + arrayBuffer += columnRelation fieldToDataMapFieldMap += getField(attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = arrayBuffer.toList) case Alias(attr: AttributeReference, _) => + val columnRelation = getColumnRelation(attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158948765 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -166,127 +208,160 @@ object PreAggregateUtil { aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r158948779 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala --- @@ -0,0 +1,44 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate --- End diff -- fixed --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1694 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/2414/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1694 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/1193/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1694 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2609/ --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1694 retest this please --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r159032529 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -116,29 +115,49 @@ object PreAggregateUtil { throw new MalformedCarbonCommandException( "Pre Aggregation is not supported on Pre-Aggregated Table") } + var counter = 0 aggExp.map { - case Alias(attr: AggregateExpression, _) => + case Alias(attr: AggregateExpression, name) => if (attr.isDistinct) { throw new MalformedCarbonCommandException( "Distinct is not supported On Pre Aggregation") } - fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable, + fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields( + carbonTable, attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => - fieldToDataMapFieldMap += getField(attr.name, + val columnRelation = getColumnRelation( + attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + arrayBuffer += columnRelation + fieldToDataMapFieldMap += createField( + attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = arrayBuffer.toList) --- End diff -- change to `columnTableRelationList = Seq(columnRelation)` --- |
In reply to this post by qiuchenjian-2
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1694#discussion_r159032604 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -116,29 +115,49 @@ object PreAggregateUtil { throw new MalformedCarbonCommandException( "Pre Aggregation is not supported on Pre-Aggregated Table") } + var counter = 0 aggExp.map { - case Alias(attr: AggregateExpression, _) => + case Alias(attr: AggregateExpression, name) => if (attr.isDistinct) { throw new MalformedCarbonCommandException( "Distinct is not supported On Pre Aggregation") } - fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable, + fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields( + carbonTable, attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => - fieldToDataMapFieldMap += getField(attr.name, + val columnRelation = getColumnRelation( + attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + val arrayBuffer = new ArrayBuffer[ColumnTableRelation]() + arrayBuffer += columnRelation + fieldToDataMapFieldMap += createField( + attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = arrayBuffer.toList) case Alias(attr: AttributeReference, _) => --- End diff -- this case is the same as previous one, can be merged --- |
Free forum by Nabble | Edit this page |