Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155279540 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +492,102 @@ object PreAggregateUtil { updatedPlan } + /** + * Below method will be used to get the select query when rollup policy is + * applied in case of timeseries table + * @param tableSchema + * main data map schema + * @param selectedDataMapSchema + * selected data map schema for rollup + * @return select query based on rolloup + */ + def createTimeseriesSelectQueryForRollup( + tableSchema: TableSchema, + selectedDataMapSchema: AggregationDataMapSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { + aggregateColumns += s"${a.getAggFunction match { + case "count" => "sum" + case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent( + a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += s"timeseries(${ + selectedDataMapSchema + .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } , '${ a.getTimeSeriesFunction }')" + } else { + groupingExpressions += selectedDataMapSchema + .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + } from ${selectedDataMapSchema.getChildSchema.getTableName } " + + s"group by ${ groupingExpressions.mkString(",") }" + } + + /** + * Below method will be used to creating select query for timeseries + * for lowest level for aggergation like second level, in that case it will + * hit the maintable + * @param tableSchema + * data map schema + * @param parentTableName + * parent schema + * @return select query for loading + */ + def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema, + parentTableName: String): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => + if (a.getAggFunction.nonEmpty) { + aggregateColumns += + s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += + s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${ + a + .getTimeSeriesFunction + }')" + } else { + groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ + aggregateColumns.mkString(",") + } from ${ parentTableName } group by ${ groupingExpressions.mkString(",") }" + + } + /** + * Below method will be used to select rollup table in case of + * timeseries data map loading + * @param list + * list of timeseries datamap + * @param dataMapSchema + * datamap schema + * @return select table name + */ + def getRollupDataMapNameForTimeSeries( + list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema], + dataMapSchema: AggregationDataMapSchema): Option[AggregationDataMapSchema] = { + if (list.isEmpty) { + None + } else { + val rollupDataMapSchema = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] + list.foreach{f => + if (dataMapSchema.canSelectForRollup(f)) { + rollupDataMapSchema += f + } } + if(rollupDataMapSchema.isEmpty) { --- End diff -- use `lastOption` instead of if else --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155279800 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUDF.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.timeseries + +import java.sql.Timestamp + +import org.apache.carbondata.core.preagg.TimeSeriesUDF + +/** + * Time series udf class + */ + +class TimeseriesUDf extends Function2[Timestamp, String, Timestamp] with Serializable{ --- End diff -- Please rename this class to avoid duplicate names --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155282425 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to extract the query columns from + * filter expression + * @param filterExp + * filter expression + * @param set + * query column list + * @param carbonTable + * parent table + * @param tableName + * table name + * @return isvalid filter expression for aggregate + */ + def extractQueryColumnFromFilterExp(filterExp: Expression, + set: scala.collection.mutable.HashSet[QueryColumn], + carbonTable: CarbonTable, tableName: String): Boolean = { + val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String] --- End diff -- rename to map --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155282532 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to extract the query columns from + * filter expression + * @param filterExp + * filter expression + * @param set + * query column list + * @param carbonTable + * parent table + * @param tableName + * table name + * @return isvalid filter expression for aggregate + */ + def extractQueryColumnFromFilterExp(filterExp: Expression, + set: scala.collection.mutable.HashSet[QueryColumn], + carbonTable: CarbonTable, tableName: String): Boolean = { + val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String] + var isValidPlan = true + filterExp.transform { + case attr: AttributeReference => + if (!newFilterList.get(attr).isDefined) { + newFilterList.put(attr, null) + } + attr + case udf@ScalaUDF(_, _, _, _) => + if (udf.function.getClass.getName + .equalsIgnoreCase("org.apache.spark.sql.execution.command.timeseries.TimeseriesUDf") && + carbonTable.hasTimeSeriesDataMap) { + newFilterList.put(udf.children(0).asInstanceOf[AttributeReference], + udf.children(1).asInstanceOf[Literal].value.toString) + } else { + udf.transform { + case attr: AttributeReference => + if (!newFilterList.get(attr).isDefined) { + newFilterList.put(attr, null) + } + attr + } + } + udf + } + newFilterList.foreach { + f => --- End diff -- format properly --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155282655 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to extract the query columns from + * filter expression + * @param filterExp + * filter expression + * @param set + * query column list + * @param carbonTable + * parent table + * @param tableName + * table name + * @return isvalid filter expression for aggregate + */ + def extractQueryColumnFromFilterExp(filterExp: Expression, + set: scala.collection.mutable.HashSet[QueryColumn], + carbonTable: CarbonTable, tableName: String): Boolean = { + val newFilterList = scala.collection.mutable.HashMap.empty[AttributeReference, String] --- End diff -- Add comment why is this required --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155283460 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -725,6 +786,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Alias(aggExp, name)(NamedExpression.newExprId, alias.qualifier).asInstanceOf[NamedExpression] + case alias@Alias(expression: Expression, name) => + val updatedExp = + if (expression.isInstanceOf[ScalaUDF] && + expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( + "org.apache.spark.sql.execution.command.timeseries.TimeseriesUDf")) { + expression.asInstanceOf[ScalaUDF].transform { + case attr: AttributeReference => + val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + attr, + attributes, + timeseriesFunction = --- End diff -- remove it --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/525/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/529/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1626 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2150/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1778/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/535/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1786/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1626 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2157/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1626 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2160/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1626 SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2167/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/542/ --- |
In reply to this post by qiuchenjian-2
Github user kumarvishal09 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155429834 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -210,4 +291,45 @@ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) { } } + public boolean isTimeseriesDataMap() { + return isTimeseriesDataMap; + } + + /** + * Below method is to support rollup during loading the data in pre aggregate table + * In case of timeseries year level table data loading can be done using month level table or any + * time series level below year level for example day,hour minute, second. + * @TODO need to handle for pre aggregate table without timeseries + * + * @param aggregationDataMapSchema + * @return whether aggregation data map can be selected or not + */ + public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) { + List<ColumnSchema> listOfColumns = childSchema.getListOfColumns(); --- End diff -- handled in query preparation --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1626 Add PR description --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155437464 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -719,4 +719,34 @@ public long size() throws IOException { } return dataSize + indexSize; } + + /** + * Utility function to check whether table has timseries datamap or not + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) { --- End diff -- Please move them to utility --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155437483 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -719,4 +719,34 @@ public long size() throws IOException { } return dataSize + indexSize; } + + /** + * Utility function to check whether table has timseries datamap or not + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) { + List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList(); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema instanceof AggregationDataMapSchema) { + return ((AggregationDataMapSchema) dataMapSchema).isTimeseriesDataMap(); + } + } + return false; + } + + /** + * Utility function to check whether table has timseries datamap or not + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasAggregationDataMap(CarbonTable carbonTable) { --- End diff -- Please move to utility --- |
Free forum by Nabble | Edit this page |