Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2054/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1850/ --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242837109 --- Diff: core/src/main/java/org/apache/carbondata/core/features/TableOperation.java --- @@ -21,7 +21,7 @@ ALTER_RENAME, ALTER_DROP, ALTER_ADD_COLUMN, - ALTER_CHANGE_DATATYPE, + ALTER_COL_RENAME_AND_CHANGE_DATATYPE, --- End diff -- Keep column rename and datatype change as different operations and make this change whereever applicable --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242837854 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala --- @@ -641,7 +641,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { val ex3 = intercept[MalformedCarbonCommandException] { sql("alter table datamap_test7 change id id BIGINT") } - assert(ex3.getMessage.contains("alter table change datatype is not supported")) + assert(ex3.getMessage.contains("alter table change datatype or column rename is not supported")) --- End diff -- Perform the validation separately for column rename and datatype change and change the message as per change in the TableOperation enum --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242849005 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala --- @@ -1487,31 +1487,46 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { * @param values * @return */ - def parseDataType(dataType: String, values: Option[List[(Int, Int)]]): DataTypeInfo = { + def parseDataType( + dataType: String, + values: Option[List[(Int, Int)]], + isColumnRename: Boolean): DataTypeInfo = { + def validateAndGetDecimalDatatype: DataTypeInfo = { + var precision: Int = 0 + var scale: Int = 0 + if (values.isDefined) { + precision = values.get(0)._1 + scale = values.get(0)._2 + } else { + throw new MalformedCarbonCommandException("Decimal format provided is invalid") + } + // precision should be > 0 and <= 38 and scale should be >= 0 and <= 38 + if (precision < 1 || precision > 38) { + throw new MalformedCarbonCommandException("Invalid value for precision") + } else if (scale < 0 || scale > 38) { + throw new MalformedCarbonCommandException("Invalid value for scale") + } + DataTypeInfo("decimal", precision, scale) + } + dataType match { case "bigint" | "long" => if (values.isDefined) { throw new MalformedCarbonCommandException("Invalid data type") } DataTypeInfo(dataType) case "decimal" => - var precision: Int = 0 - var scale: Int = 0 - if (values.isDefined) { - precision = values.get(0)._1 - scale = values.get(0)._2 + validateAndGetDecimalDatatype --- End diff -- This change is not required. You can revert. In default case handle for rename scenario --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242870325 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -318,15 +322,27 @@ class CarbonFileMetastore extends CarbonMetaStore { */ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = { + absoluteTableIdentifier: AbsoluteTableIdentifier, + timeStamp: Long)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, absoluteTableIdentifier.getTablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history - evolutionEntries.remove(evolutionEntries.size() - 1) + // we may need to remove two evolution entries if the operation is both col rename and datatype + // change operation + if (evolutionEntries.size() > 1 && + (evolutionEntries.get(evolutionEntries.size() - 1).time_stamp == + evolutionEntries.get(evolutionEntries.size() - 2).time_stamp)) { + evolutionEntries.remove(evolutionEntries.size() - 1) + evolutionEntries.remove(evolutionEntries.size() - 2) + } else { + if (evolutionEntries.get(evolutionEntries.size() - 1).time_stamp == timeStamp) { + evolutionEntries.remove(evolutionEntries.size() - 1) + } --- End diff -- Better not toi harcode the entries. Iterate and compare the entries and remove till timestamps are equal --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242871800 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -249,32 +249,99 @@ object AlterTableUtil { * @param timeStamp * @param sparkSession */ - def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) + def revertColumnRenameAndDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history - val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp - if (updatedTime == timeStamp) { - LOGGER.error(s"Reverting changes for $dbName.$tableName") - val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed - thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => - removedColumns.asScala.foreach { removedColumn => - if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) && - !columnSchema.isInvisible) { - columnSchema.setData_type(removedColumn.data_type) - columnSchema.setPrecision(removedColumn.precision) - columnSchema.setScale(removedColumn.scale) - } + // here, there can be maximum of two entries for schemaEvolution, when my operation is + // both column rename and datatype change. So check if last two Evolution entry timestamp is + // same, then it is both column rename and datatype change, so revert two entries,else one entry + if (evolutionEntryList.size() > 1 && + (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) && + (evolutionEntryList.get(evolutionEntryList.size() - 2).time_stamp == timeStamp)) { + LOGGER.error(s"Reverting column rename and datatype changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, true) + } else { + if (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, false) + } + } + metastore + .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getAbsoluteTableIdentifier, timeStamp)(sparkSession) + } + + /** + * This method reverts the column schema in case of failure in alter datatype change or col rename + * @param thriftTable thrift table + * @param evolutionEntryList SchemaEvolutionEntry List + * @param isBothColRenameAndDataTypeChange true if operation done is noth rename and datatype chng + */ + private def revertColumnSchemaChanges(thriftTable: TableInfo, + evolutionEntryList: util.List[SchemaEvolutionEntry], + isBothColRenameAndDataTypeChange: Boolean): Unit = { + var removedColumns: mutable.Buffer[org.apache.carbondata.format.ColumnSchema] = null + if (isBothColRenameAndDataTypeChange) { + removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed.asScala ++ + evolutionEntryList.get(evolutionEntryList.size() - 2).removed.asScala + } else { + removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed.asScala + } + thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => + removedColumns.foreach { removedColumn => + if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) && + !columnSchema.isInvisible) { + columnSchema.setColumn_name(removedColumn.column_name) + columnSchema.setData_type(removedColumn.data_type) + columnSchema.setPrecision(removedColumn.precision) + columnSchema.setScale(removedColumn.scale) } } - metastore - .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, - thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession) } } + /** + * This method modifies the table properties if column rename happened + * + * @param tableProperties + */ + def modifyTablePropertiesAfterColumnRename( + tableProperties: mutable.Map[String, String], + oldColumnName: String, + newColumnName: String): Unit = { + tableProperties.foreach { tableProperty => + if (tableProperty._2.contains(oldColumnName)) { --- End diff -- verify for case matching here --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242870689 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -249,32 +249,99 @@ object AlterTableUtil { * @param timeStamp * @param sparkSession */ - def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) + def revertColumnRenameAndDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history - val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp - if (updatedTime == timeStamp) { - LOGGER.error(s"Reverting changes for $dbName.$tableName") - val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed - thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => - removedColumns.asScala.foreach { removedColumn => - if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) && - !columnSchema.isInvisible) { - columnSchema.setData_type(removedColumn.data_type) - columnSchema.setPrecision(removedColumn.precision) - columnSchema.setScale(removedColumn.scale) - } + // here, there can be maximum of two entries for schemaEvolution, when my operation is + // both column rename and datatype change. So check if last two Evolution entry timestamp is + // same, then it is both column rename and datatype change, so revert two entries,else one entry + if (evolutionEntryList.size() > 1 && + (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) && + (evolutionEntryList.get(evolutionEntryList.size() - 2).time_stamp == timeStamp)) { + LOGGER.error(s"Reverting column rename and datatype changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, true) + } else { + if (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, false) + } --- End diff -- same comment as above --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242861256 --- Diff: integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala --- @@ -38,25 +37,27 @@ trait SqlAstBuilderHelper extends SparkSqlAstBuilder { override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { val newColumn = visitColType(ctx.colType) + var isColumnRename = false --- End diff -- Modify it to val --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242863058 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableColRenameAndDataTypeChangeModel, DataTypeInfo, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.datatype.DecimalType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.spark.util.DataTypeConverterUtil + +private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel: AlterTableColRenameAndDataTypeChangeModel) + extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableColRenameAndDataTypeChangeModel.tableName + val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + var isColumnRenameOnly = false + var isDataTypeChangeOnly = false + var isBothColRenameAndDataTypeChange = false + setAuditTable(dbName, tableName) + setAuditInfo(Map( + "column" -> alterTableColRenameAndDataTypeChangeModel.columnName, + "newColumn" -> alterTableColRenameAndDataTypeChangeModel.newColumnName, + "newType" -> alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + // get the latest carbon table and check for column existence + var carbonTable: CarbonTable = null + var timeStamp = 0L + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_COL_RENAME_AND_CHANGE_DATATYPE, + alterTableColRenameAndDataTypeChangeModel.columnName)) { + throw new MalformedCarbonCommandException( + "alter table change datatype or column rename is not supported for index datamap") + } + val operationContext = new OperationContext + val alterTableColRenameAndDataTypeChangePreEvent = + AlterTableColRenameAndDataTypeChangePreEvent(sparkSession, carbonTable, + alterTableColRenameAndDataTypeChangeModel) + OperationListenerBus.getInstance() + .fireEvent(alterTableColRenameAndDataTypeChangePreEvent, operationContext) + val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName + val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName + val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) + if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) { + throwMetadataException(dbName, tableName, s"Column does not exist: $oldColumnName") + } + + val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(oldColumnName)) --- End diff -- Rename `carbonColumn ` to `oldCarbonColumn` or `columnToModify` --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242869441 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableColRenameAndDataTypeChangeModel, DataTypeInfo, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.datatype.DecimalType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.spark.util.DataTypeConverterUtil + +private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel: AlterTableColRenameAndDataTypeChangeModel) + extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableColRenameAndDataTypeChangeModel.tableName + val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + var isColumnRenameOnly = false + var isDataTypeChangeOnly = false + var isBothColRenameAndDataTypeChange = false --- End diff -- Using only 1 flag can serve the required purpose of code. So make use of only 2 flags `isDataTypeChangeOnly ` and modify the code logic accordingly. You can use the rename flag from `alterTableColRenameAndDataTypeChangeModel` model --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242871412 --- Diff: integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala --- @@ -249,32 +249,99 @@ object AlterTableUtil { * @param timeStamp * @param sparkSession */ - def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) + def revertColumnRenameAndDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history - val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp - if (updatedTime == timeStamp) { - LOGGER.error(s"Reverting changes for $dbName.$tableName") - val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed - thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => - removedColumns.asScala.foreach { removedColumn => - if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) && - !columnSchema.isInvisible) { - columnSchema.setData_type(removedColumn.data_type) - columnSchema.setPrecision(removedColumn.precision) - columnSchema.setScale(removedColumn.scale) - } + // here, there can be maximum of two entries for schemaEvolution, when my operation is + // both column rename and datatype change. So check if last two Evolution entry timestamp is + // same, then it is both column rename and datatype change, so revert two entries,else one entry + if (evolutionEntryList.size() > 1 && + (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) && + (evolutionEntryList.get(evolutionEntryList.size() - 2).time_stamp == timeStamp)) { + LOGGER.error(s"Reverting column rename and datatype changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, true) + } else { + if (evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp == timeStamp) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + revertColumnSchemaChanges(thriftTable, evolutionEntryList, false) + } + } + metastore + .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getAbsoluteTableIdentifier, timeStamp)(sparkSession) + } + + /** + * This method reverts the column schema in case of failure in alter datatype change or col rename + * @param thriftTable thrift table + * @param evolutionEntryList SchemaEvolutionEntry List + * @param isBothColRenameAndDataTypeChange true if operation done is noth rename and datatype chng + */ + private def revertColumnSchemaChanges(thriftTable: TableInfo, + evolutionEntryList: util.List[SchemaEvolutionEntry], + isBothColRenameAndDataTypeChange: Boolean): Unit = { + var removedColumns: mutable.Buffer[org.apache.carbondata.format.ColumnSchema] = null + if (isBothColRenameAndDataTypeChange) { + removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed.asScala ++ + evolutionEntryList.get(evolutionEntryList.size() - 2).removed.asScala + } else { + removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed.asScala --- End diff -- I can see the same code in multiple classes. Try and refine the code and move this method at a common place --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242865160 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableColRenameAndDataTypeChangeModel, DataTypeInfo, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.datatype.DecimalType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.spark.util.DataTypeConverterUtil + +private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( + alterTableColRenameAndDataTypeChangeModel: AlterTableColRenameAndDataTypeChangeModel) + extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableColRenameAndDataTypeChangeModel.tableName + val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + var isColumnRenameOnly = false + var isDataTypeChangeOnly = false + var isBothColRenameAndDataTypeChange = false + setAuditTable(dbName, tableName) + setAuditInfo(Map( + "column" -> alterTableColRenameAndDataTypeChangeModel.columnName, + "newColumn" -> alterTableColRenameAndDataTypeChangeModel.newColumnName, + "newType" -> alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + // get the latest carbon table and check for column existence + var carbonTable: CarbonTable = null + var timeStamp = 0L + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore + carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_COL_RENAME_AND_CHANGE_DATATYPE, + alterTableColRenameAndDataTypeChangeModel.columnName)) { + throw new MalformedCarbonCommandException( + "alter table change datatype or column rename is not supported for index datamap") + } + val operationContext = new OperationContext + val alterTableColRenameAndDataTypeChangePreEvent = + AlterTableColRenameAndDataTypeChangePreEvent(sparkSession, carbonTable, + alterTableColRenameAndDataTypeChangeModel) + OperationListenerBus.getInstance() + .fireEvent(alterTableColRenameAndDataTypeChangePreEvent, operationContext) + val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName + val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName + val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) + if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) { + throwMetadataException(dbName, tableName, s"Column does not exist: $oldColumnName") + } + + val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(oldColumnName)) + if (carbonColumn.size != 1) { + throwMetadataException(dbName, tableName, s"Invalid Column: $oldColumnName") + } + val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision + val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale + if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) { + // check whether new column name is already an existing column name + if (carbonColumns.exists(_.getColName.equalsIgnoreCase(newColumnName))) { + throw new MalformedCarbonCommandException(s"Column Rename Operation failed. New " + + s"column name $newColumnName already exists" + + s" in table $tableName") + } + + // if the column rename is for complex column, block the operation + if (carbonColumn.head.isComplex) { + throw new MalformedCarbonCommandException(s"Column Rename Operation failed. Rename " + + s"column is unsupported for complex datatype " + + s"column ${ carbonColumn.head.getColName}") + } + + // if the datatype is source datatype, then it is just a column rename operation, else do + // the datatype validation for not source datatype + if (carbonColumn.head.getDataType.getName + .equalsIgnoreCase(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) { + val oldColumnPrecision = carbonColumn.head.getDataType.asInstanceOf[DecimalType] + .getPrecision + val oldColumnScale = carbonColumn.head.getDataType.asInstanceOf[DecimalType].getScale + if (carbonColumn.head.getDataType.getName.equalsIgnoreCase("decimal") && + (!oldColumnPrecision --- End diff -- scale and precision is an integer comparison. Check using `!=` operator --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242870457 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala --- @@ -199,11 +202,22 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { */ override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, - identifier: AbsoluteTableIdentifier) + identifier: AbsoluteTableIdentifier, + timeStamp: Long) (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history - evolutionEntries.remove(evolutionEntries.size() - 1) + // we may need to remove two evolution entries if the operation is both col rename and datatype + // change operation + if (evolutionEntries.size() > 1 && (evolutionEntries.get(evolutionEntries.size() - 1).time_stamp + == evolutionEntries.get(evolutionEntries.size() - 2).time_stamp)) { + evolutionEntries.remove(evolutionEntries.size() - 1) + evolutionEntries.remove(evolutionEntries.size() - 2) + } else { + if (evolutionEntries.get(evolutionEntries.size() - 1).time_stamp == timeStamp) { + evolutionEntries.remove(evolutionEntries.size() - 1) + } --- End diff -- Same comment as above --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2990#discussion_r242860813 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -171,11 +171,12 @@ case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel, case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0) -case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo, +case class AlterTableColRenameAndDataTypeChangeModel(dataTypeInfo: DataTypeInfo, --- End diff -- Seggregate Rename and DataType change model --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10105/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2058/ --- |
In reply to this post by qiuchenjian-2
Github user akashrn5 commented on the issue:
https://github.com/apache/carbondata/pull/2990 @manishgupta88 handled comments, please review --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1853/ --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2990 Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10108/ --- |
Free forum by Nabble | Edit this page |