GitHub user pawanmalwal opened a pull request:
https://github.com/apache/carbondata/pull/1472 [WIP][CARBONDATA-1618]Table comment support for alter table Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [X] Any interfaces changed? None - [X] Any backward compatibility impacted? None - [X] Document update required? None - [X] Testing done Test case added - TestAlterTableWithTableComment.scala - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pawanmalwal/carbondata support_table_comment Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1472 ---- commit 9af83d9a5ab5b9b6de17997c583f37e7bcb65d4f Author: Pawan Malwal <[hidden email]> Date: 2017-11-06T12:40:14Z [WIP][CARBONDATA-1618]Table comment support for alter table ---- --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1472 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1472 Can one of the admins verify this patch? --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1472 add to whitelist --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149348219 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} + +private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier, + val properties: Map[String, String], + val isView: Boolean) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) --- End diff -- Refactor the code to extract common code from both commands. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149349120 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala --- @@ -105,6 +105,10 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) val carbonTable = relation.tableMeta.carbonTable + // Carbon table support table comment + val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties + .getOrDefault("comment", "") --- End diff -- Move the `comment` as constant to CarbonCommonConstants --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1472 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/852/ --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149585846 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} + +private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier, + val properties: Map[String, String], + val isView: Boolean) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) --- End diff -- Handled the comment. Please review. --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149585944 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} + +private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier, + val properties: Map[String, String], + val isView: Boolean) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) --- End diff -- Handled the comment please review. --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149586000 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala --- @@ -105,6 +105,10 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) val carbonTable = relation.tableMeta.carbonTable + // Carbon table support table comment + val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties + .getOrDefault("comment", "") --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149587321 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1374,6 +1374,12 @@ @CarbonProperty public static final String BITSET_PIPE_LINE = "carbon.use.bitset.pipe.line"; + /** + * this will be used to provide comment for table + */ + @CarbonProperty --- End diff -- its not a carbon property, it is just a constant --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1472 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/867/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149587610 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -317,4 +321,84 @@ object AlterTableUtil { } } + /** + * This method add/modify the table comments. + * + * @param tableIdentifier + * @param properties + * @param propKeys + * @param set + * @param sparkSession + * @param sessionState + */ + def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String], + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { + val tableName = tableIdentifier.table + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val tblPropertiesMap: mutable.Map[String, String] = + thriftTable.fact_table.getTableProperties.asScala + if (set) { + // This overrides old properties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + + properties.foreach { + x => --- End diff -- move this to above line like properties.foreach { x => --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149587728 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -317,4 +321,84 @@ object AlterTableUtil { } } + /** + * This method add/modify the table comments. + * + * @param tableIdentifier + * @param properties + * @param propKeys + * @param set + * @param sparkSession + * @param sessionState + */ + def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String], + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { + val tableName = tableIdentifier.table + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val tblPropertiesMap: mutable.Map[String, String] = + thriftTable.fact_table.getTableProperties.asScala + if (set) { + // This overrides old properties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + + properties.foreach { + x => + if (x._1.equalsIgnoreCase(CarbonCommonConstants.TABLE_COMMENT)) { + tblPropertiesMap.put(x._1, x._2) + } + } + } else { + // This removes the comment parameter from thriftTable + // since thriftTable also holds comment as its property. + propKeys.foreach { + x => --- End diff -- move this to above line like properties.foreach { x => --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1472 SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1483/ --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149593267 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -317,4 +321,84 @@ object AlterTableUtil { } } + /** + * This method add/modify the table comments. + * + * @param tableIdentifier + * @param properties + * @param propKeys + * @param set + * @param sparkSession + * @param sessionState + */ + def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String], + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { + val tableName = tableIdentifier.table + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val tblPropertiesMap: mutable.Map[String, String] = + thriftTable.fact_table.getTableProperties.asScala + if (set) { + // This overrides old properties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + + properties.foreach { + x => + if (x._1.equalsIgnoreCase(CarbonCommonConstants.TABLE_COMMENT)) { + tblPropertiesMap.put(x._1, x._2) + } + } + } else { + // This removes the comment parameter from thriftTable + // since thriftTable also holds comment as its property. + propKeys.foreach { + x => --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149593291 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -317,4 +321,84 @@ object AlterTableUtil { } } + /** + * This method add/modify the table comments. + * + * @param tableIdentifier + * @param properties + * @param propKeys + * @param set + * @param sparkSession + * @param sessionState + */ + def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String], + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { + val tableName = tableIdentifier.table + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val tblPropertiesMap: mutable.Map[String, String] = + thriftTable.fact_table.getTableProperties.asScala + if (set) { + // This overrides old properties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + + properties.foreach { + x => --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user pawanmalwal commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1472#discussion_r149593333 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1374,6 +1374,12 @@ @CarbonProperty public static final String BITSET_PIPE_LINE = "carbon.use.bitset.pipe.line"; + /** + * this will be used to provide comment for table + */ + @CarbonProperty --- End diff -- Done. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1472 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/871/ --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1472 @pawanmalwal Please rebase the code, there is one conflict. --- |
Free forum by Nabble | Edit this page |