GitHub user ravikiran23 opened a pull request:
https://github.com/apache/carbondata/pull/954 [WIP] IUD support in 2.1 1. supporting IUD feature in 2.1 spark. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravikiran23/incubator-carbondata master-iud-2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #954 ---- commit 8229c1423debceed8af8d2295a4ab5aed15f581a Author: ravikiran23 <[hidden email]> Date: 2017-05-26T06:14:53Z IUD support in 2.1 ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/954 add to whitelist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/954 add to whitelist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870085 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.iud --- End diff -- Remove the duplicate test case in spark package --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870127 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala --- @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.iud + +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { --- End diff -- Remove the duplicate test case in spark package --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870355 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala --- @@ -0,0 +1,857 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command --- End diff -- Check whether this class can be moved common-package and remove the duplicate code in 1.6 spark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870584 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -73,3 +78,136 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { } } } + +object CarbonIUDAnalysisRule extends Rule[LogicalPlan] { + + var sparkSession: SparkSession = _ + + def init(sparkSession: SparkSession) { + this.sparkSession = sparkSession + } + + private def processUpdateQuery( + table: UnresolvedRelation, + columns: List[String], + selectStmt: String, + filter: String): LogicalPlan = { + var includedDestColumns = false + var includedDestRelation = false + var addedTupleId = false + + def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { + val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", + Seq.empty, isDistinct = false), "tupleId")()) + val projList = Seq( + UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) + // include tuple id and rest of the required columns in subqury + SubqueryAlias(table.alias.getOrElse(""), Project(projList, relation), Option(table.tableIdentifier)) + } + // get the un-analyzed logical plan + val targetTable = prepareTargetReleation(table) + val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform { + case Project(projectList, child) if (!includedDestColumns) => + includedDestColumns = true + if (projectList.size != columns.size) { + sys.error("Number of source and destination columns are not matching") + } + val renamedProjectList = projectList.zip(columns).map{ case(attr, col) => + attr match { + case UnresolvedAlias(child22, _) => + UnresolvedAlias(Alias(child22, col + "-updatedColumn")()) + case UnresolvedAttribute(param) => + UnresolvedAlias(Alias(attr, col + "-updatedColumn")()) + // UnresolvedAttribute(col + "-updatedColumn") +// UnresolvedAlias(Alias(child, col + "-updatedColumn")()) + case _ => attr + } + } + val list = Seq( + UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq)))) ++ renamedProjectList + Project(list, child) + case Filter(cond, child) if (!includedDestRelation) => + includedDestRelation = true + Filter(cond, Join(child, targetTable, Inner, None)) + case r @ UnresolvedRelation(t, a) if (!includedDestRelation && + t != table.tableIdentifier) => + includedDestRelation = true + Join(r, targetTable, Inner, None) + } + val updatedSelectPlan : LogicalPlan = if (!includedDestRelation) { + // special case to handle self join queries + // Eg. update tableName SET (column1) = (column1+1) + selectPlan transform { + case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier && + addedTupleId == false) => + addedTupleId = true + targetTable + } + } else { + selectPlan + } + val finalPlan = if (filter.length > 0) { + val alias = table.alias.getOrElse("") + var transformed: Boolean = false + // Create a dummy projection to include filter conditions + var newPlan: LogicalPlan = null + if (table.tableIdentifier.database.isDefined) { + newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from " + + table.tableIdentifier.database + .getOrElse("") + "." + + table.tableIdentifier.table + + " " + alias + " " + + filter) + } + else { + newPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan("select * from " + + table.tableIdentifier.table + + " " + alias + " " + + filter) + } + newPlan transform { + case UnresolvedRelation(t, Some(a)) if ( + !transformed && t == table.tableIdentifier && a == alias) => + transformed = true + // Add the filter condition of update statement on destination table + SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + } + } else { + updatedSelectPlan + } + val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) + val tidSeq = Seq(getDB.getDatabaseName(tid.database, sparkSession)) + val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) + ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) + } + + def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = { + // val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) + val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, sparkSession), + table.tableIdentifier.table) + var addedTupleId = false + val selectPlan = new SparkSqlParser(sparkSession.sessionState.conf).parsePlan(selectStmt) transform { + case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier && + addedTupleId == false) => + addedTupleId = true + val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", + Seq.empty, isDistinct = false), "tupleId")()) + val projList = Seq( + UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) --- End diff -- This won't work if there is no alias to table. if there no alias then it should be passed as none --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870649 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala --- @@ -113,6 +114,25 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { output } + private def processPlan(plan: LogicalPlan): LogicalPlan = { + plan transform { + case ProjectForUpdate(table, cols, Seq(updatePlan)) => + var isTransformed = false + val newPlan = updatePlan transform { + case Project(pList, child) if (!isTransformed) => + val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList + .splitAt(pList.size - cols.size) + val diff = cols.diff(dest.map(_.name)) + if (diff.size > 0) { + sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}") + } + isTransformed = true + Project(dest.filter(a => !cols.contains(a.name)) ++ source, child) + } + ProjectForUpdateCommand(newPlan, Seq(table.tableIdentifier.toString())) --- End diff -- Cannot be sent as toString as it gived wrong value if you use db name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870746 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -78,6 +80,128 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { AlterTableCompaction(altertablemodel) } + protected lazy val deleteRecords: Parser[LogicalPlan] = + (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ { + case table ~ rest => + val tableName = getTableName(table.tableIdentifier) + val alias = table.alias.getOrElse("") + DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table) + } + + protected lazy val updateTable: Parser[LogicalPlan] = + UPDATE ~> table ~ + (SET ~> "(" ~> repsep(element, ",") <~ ")") ~ + ("=" ~> restInput) <~ opt(";") ^^ { + case tab ~ columns ~ rest => + val (sel, where) = splitQuery(rest) + val (selectStmt, relation) = + if (!sel.toLowerCase.startsWith("select ")) { + if (sel.trim.isEmpty) { + sys.error("At least one source column has to be specified ") + } + // only list of expression are given, need to convert that list of expressions into + // select statement on destination table + val relation = tab match { + case r@UnresolvedRelation(tableIdentifier, alias) => + updateRelation(r, tableIdentifier, alias) + case _ => tab + } + ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " + + relation.alias.get, relation) + } else { + (sel, updateRelation(tab, tab.tableIdentifier, tab.alias)) + } + UpdateTable(relation, columns, selectStmt, where) + } + + private def updateRelation( + r: UnresolvedRelation, + tableIdentifier: Seq[String], + alias: Option[String]): UnresolvedRelation = { + alias match { + case Some(_) => r + case _ => + val tableAlias = tableIdentifier match { + case Seq(dbName, tableName) => Some(tableName) + case Seq(tableName) => Some(tableName) + } + UnresolvedRelation(tableIdentifier, Option(tableAlias.toString)) --- End diff -- Just pass as tableAlias, it already an option --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/954#discussion_r118870847 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala --- @@ -86,6 +87,29 @@ case class ShowLoadsCommand(databaseNameOp: Option[String], table: String, limit } } +case class ProjectForUpdate( + table: UnresolvedRelation, + columns: List[String], + child: Seq[LogicalPlan] ) extends Command { --- End diff -- It cannot extend a command as it required to be analyzed in analysis phase so it should extend LogicalPlan in 2.1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/954 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/954 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |