GitHub user kumarvishal09 opened a pull request:
https://github.com/apache/carbondata/pull/1626 [CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table Support Loading and query on time series pre aggregate table - [ ] Any interfaces changed? No - [ ] Any backward compatibility impacted? No - [ ] Document update required? No - [ ] Testing done Added UT test cases for timeseries table selection and data loading on timeseries table + validation timeseries table data - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kumarvishal09/incubator-carbondata Branch_master_Timeseries_5-12_Query Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1626 ---- commit 31ff61fb36336d3cbbf518cde6716894eed17638 Author: kumarvishal <[hidden email]> Date: 2017-12-05T10:30:48Z added support for time series create table commit 9e72145b862553ad34ddced5bfe1282e23561fdd Author: kumarvishal <[hidden email]> Date: 2017-12-05T15:26:58Z Added support for timeseries query ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1626 Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/515/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155263571 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) { Iterator<ColumnSchema> iterator = columnSchemas.iterator(); while (iterator.hasNext()) { ColumnSchema next = iterator.next(); - if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next + .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the columns on which aggregate function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getNonAggNonTimeChildColBasedByParent(String columnName) { --- End diff -- Please rename method to `getNonAggTimeChildColBasedByParent` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155263690 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) { Iterator<ColumnSchema> iterator = columnSchemas.iterator(); while (iterator.hasNext()) { ColumnSchema next = iterator.next(); - if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next --- End diff -- please rename method to `getNonAggNonTimeChildColBasedByParent` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155264012 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) { Iterator<ColumnSchema> iterator = columnSchemas.iterator(); while (iterator.hasNext()) { ColumnSchema next = iterator.next(); - if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next + .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the columns on which aggregate function is not applied --- End diff -- change the description --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155264101 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -74,7 +86,49 @@ public ColumnSchema getNonAggChildColBasedByParent(String columnName) { Iterator<ColumnSchema> iterator = columnSchemas.iterator(); while (iterator.hasNext()) { ColumnSchema next = iterator.next(); - if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next + .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the columns on which aggregate function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getNonAggNonTimeChildColBasedByParent(String columnName) { + Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName); + if (null != columnSchemas) { + Iterator<ColumnSchema> iterator = columnSchemas.iterator(); + while (iterator.hasNext()) { + ColumnSchema next = iterator.next(); + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty())) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the columns on which aggregate function is not applied --- End diff -- change description --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155265299 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -125,6 +179,25 @@ public ColumnSchema getAggChildColByParent(String columnName, return null; } + /** + * Below method will be used to get the column schema based on parent column name + * @param columName + * parent column name + * @return child column schema + */ + public ColumnSchema getTimeseriesChildColByParent(String columName, String timeseriesFunction) { --- End diff -- I think it is duplicated to `getChildColByParentColName`, please remove it --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155267822 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java --- @@ -210,4 +291,45 @@ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) { } } + public boolean isTimeseriesDataMap() { + return isTimeseriesDataMap; + } + + /** + * Below method is to support rollup during loading the data in pre aggregate table + * In case of timeseries year level table data loading can be done using month level table or any + * time series level below year level for example day,hour minute, second. + * @TODO need to handle for pre aggregate table without timeseries + * + * @param aggregationDataMapSchema + * @return whether aggregation data map can be selected or not + */ + public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) { + List<ColumnSchema> listOfColumns = childSchema.getListOfColumns(); --- End diff -- please handle `dummy-measure` scenario --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155270004 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -152,6 +154,15 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) { } table.hasDataMapSchema = null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0; + List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList(); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema instanceof AggregationDataMapSchema) { + if (!table.hasTimeSeriesDataMap) { --- End diff -- Please don't change the carbonTable for timeseriesdatamap. Add an utility function to decide it has timeseries datamap or not --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155270121 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java --- @@ -152,6 +154,15 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) { } table.hasDataMapSchema = --- End diff -- Please don't change the carbonTable for aggdatamap. Add a utility function to decide it has agg datamap or not --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155271321 --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java --- @@ -70,8 +70,15 @@ public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) { AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema; isMatch = true; for (QueryColumn queryColumn : projectionColumn) { - ColumnSchema columnSchemaByParentName = aggregationDataMapSchema - .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName()); + ColumnSchema columnSchemaByParentName = null; --- End diff -- Please refactor to remove duplicate code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155271792 --- Diff: core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java --- @@ -21,20 +21,46 @@ * enum for timeseries function */ public enum TimeSeriesFunction { - SECOND("second"), - MINUTE("minute"), - HOUR("hour"), - DAY("day"), - MONTH("month"), - YEAR("year"); + SECOND("second", 0), + MINUTE("minute", 1), + HOUR("hour", 2), + DAY("day", 3), + MONTH("month", 4), + YEAR("year", 5); private String name; - TimeSeriesFunction(String name) { + private int ordinal; + + TimeSeriesFunction(String name, int ordinal) { this.name = name; + this.ordinal = ordinal; } public String getName() { return name; } + + public int getOrdinal() { + return ordinal; + } + +// public static TimeSeriesFunction valueOf(String name) { --- End diff -- remove commented code --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155272388 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.timeseries + +import java.sql.Timestamp + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") + sql(s"LOAD DATA LOCAL INPATH 'D:/mydata.csv' into table mainTable") --- End diff -- change path --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155275089 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -38,8 +38,20 @@ object LoadPostAggregateListener extends OperationEventListener { val sparkSession = loadEvent.sparkSession val carbonLoadModel = loadEvent.carbonLoadModel val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // val parentTableName = table.getTableName if (table.hasDataMapSchema) { - for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) { + // getting all the aggergate datamap schema + val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala --- End diff -- simplify `table.getTableInfo.getDataMapSchemaList.asScala.filter(_.isInstanceOf[AggregationDataMapSchema]).map(_.asInstanceOf)` --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155276144 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -65,6 +95,10 @@ object LoadPostAggregateListener extends OperationEventListener { dataFrame = Some(childDataFrame), internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). run(sparkSession) + if (dataMapSchema.isInstanceOf[AggregationDataMapSchema] && --- End diff -- remove this check --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155276423 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -51,8 +63,26 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad") + val childDataFrame = + if (!dataMapSchema.asInstanceOf[AggregationDataMapSchema].isTimeseriesDataMap) { --- End diff -- no need to use asInstanceOf --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155277594 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -51,8 +63,26 @@ object LoadPostAggregateListener extends OperationEventListener { carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad") + val childDataFrame = + if (!dataMapSchema.asInstanceOf[AggregationDataMapSchema].isTimeseriesDataMap) { + sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad") + } else { + // for timeseries rollup policy + val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, + dataMapSchema.asInstanceOf[AggregationDataMapSchema]) + // if non of the rollup data map is selected hit the maintable and prepare query + val childQuery = if (!tableSelectedForRollup.isDefined) { --- End diff -- use isEmpty --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155278666 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +492,102 @@ object PreAggregateUtil { updatedPlan } + /** + * Below method will be used to get the select query when rollup policy is + * applied in case of timeseries table + * @param tableSchema + * main data map schema + * @param selectedDataMapSchema + * selected data map schema for rollup + * @return select query based on rolloup + */ + def createTimeseriesSelectQueryForRollup( + tableSchema: TableSchema, + selectedDataMapSchema: AggregationDataMapSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { --- End diff -- move up a => --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155279264 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +492,102 @@ object PreAggregateUtil { updatedPlan } + /** + * Below method will be used to get the select query when rollup policy is + * applied in case of timeseries table + * @param tableSchema + * main data map schema + * @param selectedDataMapSchema + * selected data map schema for rollup + * @return select query based on rolloup + */ + def createTimeseriesSelectQueryForRollup( --- End diff -- After compaction PR is merged , this duplicate function should be merged to there. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155279382 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala --- @@ -493,4 +492,102 @@ object PreAggregateUtil { updatedPlan } + /** + * Below method will be used to get the select query when rollup policy is + * applied in case of timeseries table + * @param tableSchema + * main data map schema + * @param selectedDataMapSchema + * selected data map schema for rollup + * @return select query based on rolloup + */ + def createTimeseriesSelectQueryForRollup( + tableSchema: TableSchema, + selectedDataMapSchema: AggregationDataMapSchema): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { + aggregateColumns += s"${a.getAggFunction match { + case "count" => "sum" + case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent( + a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += s"timeseries(${ + selectedDataMapSchema + .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } , '${ a.getTimeSeriesFunction }')" + } else { + groupingExpressions += selectedDataMapSchema + .getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + } from ${selectedDataMapSchema.getChildSchema.getTableName } " + + s"group by ${ groupingExpressions.mkString(",") }" + } + + /** + * Below method will be used to creating select query for timeseries + * for lowest level for aggergation like second level, in that case it will + * hit the maintable + * @param tableSchema + * data map schema + * @param parentTableName + * parent schema + * @return select query for loading + */ + def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema, + parentTableName: String): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => + if (a.getAggFunction.nonEmpty) { + aggregateColumns += + s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += + s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${ + a + .getTimeSeriesFunction + }')" --- End diff -- format properly --- |
Free forum by Nabble | Edit this page |