Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178758267 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { var locks = List.empty[ICarbonLock] val uniqueId = System.currentTimeMillis().toString + val childCommands = operationContext.getProperty("dropPartitionCommands") --- End diff -- Remove as unused --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178761202 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand( table.getDatabaseName, table.getTableName, locksToBeAcquired)(sparkSession) + // If flow is for child table then get the uuid from operation context. + // If flow is for parent table then generate uuid for child flows and set the uuid to "" + // for parent table + // If normal table then set uuid to "". + val uuid = if (table.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString --- End diff -- In case of isChildDataMap condition it is mandatory to get UUID set by parent table. If not set then log the message here or better to throw the exception from here --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178719972 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala --- @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7")) } + test("test auto compaction on aggregate table") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3")) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") --- End diff -- 1. Wrap the test case in try/finally...In case of any failure the property need to be set to default value again else proceeding test cases might fail 2. Replace false with default value from CarbonCommonConstants --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178719497 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java --- @@ -130,15 +130,20 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws loadModel.getTablePath()); newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT); } + OperationContext operationContext = (OperationContext) getOperationContext(); + String uuid = ""; + if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && + operationContext != null) { + uuid = operationContext.getProperty("uuid").toString(); + } CarbonLoaderUtil .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(), true); CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); long segmentSize = CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); if (segmentSize > 0 || overwriteSet) { - Object operationContext = getOperationContext(); - if (operationContext != null) { + if (operationContext != null && carbonTable.hasAggregationDataMap()) { ((OperationContext) operationContext) --- End diff -- Remove typecasting again --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178727711 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala --- @@ -0,0 +1,488 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain id copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.carbondata.spark.testsuite.standardpartition + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeAll(): Unit = { + sql("drop database if exists partition_preaggregate cascade") + sql("create database partition_preaggregate") + sql("use partition_preaggregate") + sql( + """ + | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + """ + | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + } + + override def afterAll(): Unit = { + sql("drop database if exists partition_preaggregate cascade") + sql("use default") + } + + // Create aggregate table on partition with partition column in aggregation only. + test("test preaggregate table creation on partition table with partition col as aggregation") { + sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id") + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition with partition column in projection and aggregation only. + test("test preaggregate table creation on partition table with partition col as projection") { + sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ") + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition with partition column as group by. + test("test preaggregate table creation on partition table with partition col as group by") { + sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ") + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition without partition column. + test("test preaggregate table creation on partition table without partition column") { + sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ") + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable) + } + + test("test data correction in aggregate table when partition column is used") { + sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age") + checkAnswer(sql("select * from maintable_p1"), + Seq(Row(1,31,31), + Row(2,27,27), + Row(3,70,35), + Row(4,26,26), + Row(4,29,29))) + preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1") + sql("drop datamap p1 on table maintable") + } + + test("test data correction in aggregate table when partition column is not used") { + sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id") + checkAnswer(sql("select * from maintable_p2"), + Seq(Row(1,31), + Row(2,27), + Row(3,35), + Row(4,29))) + preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2") + sql("drop datamap p2 on table maintable") + } + + test("test data correction with insert overwrite") { + sql("drop table if exists partitionone") + sql( + """ + | CREATE TABLE if not exists partitionone (empname String) + | PARTITIONED BY (year int, month int,day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") + sql("insert into partitionone values('k',2014,1,1)") + sql("insert overwrite table partitionone values('v',2014,1,1)") + checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1))) + checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1))) + } + + test("test data correction with insert overwrite on different value") { + sql("drop table if exists partitionone") + sql( + """ + | CREATE TABLE if not exists partitionone (empname String) + | PARTITIONED BY (year int, month int,day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") + sql("insert into partitionone values('k',2014,1,1)") + sql("insert overwrite table partitionone values('v',2015,1,1)") + checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1))) --- End diff -- after insert overwrite operation only one row should come (Row("v",2015,1,1))....Is my understanding correct? --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178767625 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { --- End diff -- Write a comment to explain when uuid will be empty and when it will be non empty --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178780592 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") --- End diff -- Make it as info or error --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178787300 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -62,8 +63,25 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val parentPartitionColumns = if (parentTable.isHivePartitionTable) { + partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + } else { + Seq() + } + // Generate child table partition columns in the same order as the parent table. + val partitionerFields = fieldRelationMap.collect { + case (field, dataMapField) if parentPartitionColumns + .exists(parentCol => + parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty) => + (PartitionerField(field.name.get, + field.dataType, + field.columnComment), parentPartitionColumns + .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName)) + }.toSeq.sortBy(_._2).map(_._1) --- End diff -- I think sortBy and map operation is not required here as already the child datamap ordering is according to the parent table partition column ordering --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178784636 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + command => + val carbonTable = command.table + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childCommands.map(_.table), + operationContext, + uuid) + if (commitFailed) { + sys.error("Failed to update table status for pre-aggregate table") + } + + } + } +} + +object AlterTableDropPartitionMetaListener extends OperationEventListener{ + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent] + val parentCarbonTable = dropPartitionEvent.parentCarbonTable + val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys) + val sparkSession = SparkSession.getActiveSession.get + if (parentCarbonTable.hasAggregationDataMap) { + // used as a flag to block direct drop partition on aggregate tables fired by the user + operationContext.setProperty("isInternalDropCall", "true") + // Filter out all the tables which dont have the partition being dropped. + val childTablesWithoutPartitionColumns = + parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema => + val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala + val partitionColExists = partitionsToBeDropped.forall { + partition => + childColumns.exists { childColumn => + childColumn.getAggFunction.isEmpty && + childColumn.getParentColumnTableRelations.asScala.head.getColumnName. + equals(partition) + } + } + !partitionColExists + } + if (childTablesWithoutPartitionColumns.nonEmpty) { + throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" + + s" participating in the following datamaps ${ + childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName) --- End diff -- Make sure this list is printed to console output --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178775449 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + command => + val carbonTable = command.table + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childCommands.map(_.table), --- End diff -- 1. Make this call in finally block 2. We need to have a mechanism for clean up during clean operation. **For this we can raise a jira to track the issue** --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178795973 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala --- @@ -0,0 +1,488 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain id copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.carbondata.spark.testsuite.standardpartition + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeAll(): Unit = { + sql("drop database if exists partition_preaggregate cascade") + sql("create database partition_preaggregate") + sql("use partition_preaggregate") + sql( + """ + | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + """ + | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + } + + override def afterAll(): Unit = { + sql("drop database if exists partition_preaggregate cascade") + sql("use default") + } + + // Create aggregate table on partition with partition column in aggregation only. + test("test preaggregate table creation on partition table with partition col as aggregation") { + sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id") + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition with partition column in projection and aggregation only. + test("test preaggregate table creation on partition table with partition col as projection") { + sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ") + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition with partition column as group by. + test("test preaggregate table creation on partition table with partition col as group by") { + sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ") + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable) + } + + // Create aggregate table on partition without partition column. + test("test preaggregate table creation on partition table without partition column") { + sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ") + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable) + } + + test("test data correction in aggregate table when partition column is used") { + sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age") + checkAnswer(sql("select * from maintable_p1"), + Seq(Row(1,31,31), + Row(2,27,27), + Row(3,70,35), + Row(4,26,26), + Row(4,29,29))) + preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1") + sql("drop datamap p1 on table maintable") + } + + test("test data correction in aggregate table when partition column is not used") { + sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id") + checkAnswer(sql("select * from maintable_p2"), + Seq(Row(1,31), + Row(2,27), + Row(3,35), + Row(4,29))) + preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2") + sql("drop datamap p2 on table maintable") + } + + test("test data correction with insert overwrite") { + sql("drop table if exists partitionone") + sql( + """ + | CREATE TABLE if not exists partitionone (empname String) + | PARTITIONED BY (year int, month int,day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") + sql("insert into partitionone values('k',2014,1,1)") + sql("insert overwrite table partitionone values('v',2014,1,1)") + checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1))) + checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1))) + } + + test("test data correction with insert overwrite on different value") { + sql("drop table if exists partitionone") + sql( + """ + | CREATE TABLE if not exists partitionone (empname String) + | PARTITIONED BY (year int, month int,day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") + sql("insert into partitionone values('k',2014,1,1)") + sql("insert overwrite table partitionone values('v',2015,1,1)") + checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1))) --- End diff -- In case of partition, overwrite would on per partition level not on table level. So on the matching partitions would be overwritten. --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801685 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala --- @@ -62,8 +63,25 @@ case class PreAggregateTableHelper( val df = sparkSession.sql(updatedQuery) val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( df.logicalPlan, queryString) + val partitionInfo = parentTable.getPartitionInfo val fields = fieldRelationMap.keySet.toSeq val tableProperties = mutable.Map[String, String]() + val parentPartitionColumns = if (parentTable.isHivePartitionTable) { + partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName) + } else { + Seq() + } + // Generate child table partition columns in the same order as the parent table. + val partitionerFields = fieldRelationMap.collect { + case (field, dataMapField) if parentPartitionColumns + .exists(parentCol => + parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty) => + (PartitionerField(field.name.get, + field.dataType, + field.columnComment), parentPartitionColumns + .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName)) + }.toSeq.sortBy(_._2).map(_._1) --- End diff -- removed sortBy --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801725 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + command => + val carbonTable = command.table + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childCommands.map(_.table), + operationContext, + uuid) + if (commitFailed) { + sys.error("Failed to update table status for pre-aggregate table") + } + + } + } +} + +object AlterTableDropPartitionMetaListener extends OperationEventListener{ + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent] + val parentCarbonTable = dropPartitionEvent.parentCarbonTable + val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys) + val sparkSession = SparkSession.getActiveSession.get + if (parentCarbonTable.hasAggregationDataMap) { + // used as a flag to block direct drop partition on aggregate tables fired by the user + operationContext.setProperty("isInternalDropCall", "true") + // Filter out all the tables which dont have the partition being dropped. + val childTablesWithoutPartitionColumns = + parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema => + val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala + val partitionColExists = partitionsToBeDropped.forall { + partition => + childColumns.exists { childColumn => + childColumn.getAggFunction.isEmpty && + childColumn.getParentColumnTableRelations.asScala.head.getColumnName. + equals(partition) + } + } + !partitionColExists + } + if (childTablesWithoutPartitionColumns.nonEmpty) { + throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" + + s" participating in the following datamaps ${ + childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName) --- End diff -- Verified --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801747 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801779 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false + } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + protected def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } +} + +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent] + val carbonTable = postStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + val renamedDataMaps = childCommands.takeWhile { + childCommand => + val childCarbonTable = childCommand.table + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + command => + val carbonTable = command.table + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") + } + } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childCommands.map(_.table), --- End diff -- Added TODO --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801803 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala --- @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +object AlterTableDropPartitionPreStatusListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext) = { + val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent] + val carbonTable = preStatusListener.carbonTable + val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands") + if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) { + val childCommands = + childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]] + childCommands.foreach(_.processData(SparkSession.getActiveSession.get)) + } + } +} + +trait CommitHelper { + + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + + protected def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + carbonTable: CarbonTable): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + protected def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String): Boolean = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801858 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand( table.getDatabaseName, table.getTableName, locksToBeAcquired)(sparkSession) + // If flow is for child table then get the uuid from operation context. + // If flow is for parent table then generate uuid for child flows and set the uuid to "" + // for parent table + // If normal table then set uuid to "". + val uuid = if (table.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString --- End diff -- Logged the message --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801883 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala --- @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { var locks = List.empty[ICarbonLock] val uniqueId = System.currentTimeMillis().toString + val childCommands = operationContext.getProperty("dropPartitionCommands") --- End diff -- done --- |
In reply to this post by qiuchenjian-2
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801930 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala --- @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7")) } + test("test auto compaction on aggregate table") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) + segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3")) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") --- End diff -- added in afterAll --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109 Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3565/ --- |
Free forum by Nabble | Edit this page |