ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534727593 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1044,7 +1045,7 @@ public static void cleanSegments(CarbonTable table, Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); - if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { + if (TrashUtil.isTrashDataTimeout(fileTimestamp) || forceDelete) { Review comment: Also update this behavior change in document, now `max.query.execution.time` expire will not delete the data but trash time has to expire also. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534730624 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -146,8 +146,8 @@ public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable) * in the metadata folder and is not present in the table status file is considered as a * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file */ - private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> staleSegmentFiles, - List<String> redundantSegmentFile) { + private static void collectStaleSegmentFiles(CarbonTable carbonTable, Review comment: I suggest `getStaleSegmentFiles` is fine, but make a list inside the function and send as a return value. Passing as an argument makes sense only when the list has some pre-entries or used in recursion. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534742148 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -146,8 +146,8 @@ public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable) * in the metadata folder and is not present in the table status file is considered as a * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file */ - private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> staleSegmentFiles, - List<String> redundantSegmentFile) { + private static void collectStaleSegmentFiles(CarbonTable carbonTable, Review comment: this method need return two lists, so it passes two lists in parameters ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534742148 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -146,8 +146,8 @@ public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable) * in the metadata folder and is not present in the table status file is considered as a * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file */ - private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> staleSegmentFiles, - List<String> redundantSegmentFile) { + private static void collectStaleSegmentFiles(CarbonTable carbonTable, Review comment: this method need return two lists, so it passes two lists in parameters. now it return void, so "collect" is better than "get" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534743440 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,12 +192,9 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: now we are using this for data outside trash also, so change the description. `returns true, if the current timestamp has expired based on carbon.trash.retention.days ` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534744818 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -208,6 +205,14 @@ public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) { return difference > retentionMilliSeconds; } + /** + * whether trash data outside of .Trash folder is time out + */ + public static boolean isTrashDataTimeout(long fileTimestamp) { + return isTrashRetentionTimeoutExceeded(fileTimestamp) && Review comment: isTrashRetentionTimeoutExceeded: whether trash data inside of .Trash folder is time out isTrashDataTimeout: whether trash data outside of .Trash folder is time out TrashRetentionTimeout can be 0, but MaxQueryTimeout not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534745677 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1044,7 +1045,7 @@ public static void cleanSegments(CarbonTable table, Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); - if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { + if (TrashUtil.isTrashDataTimeout(fileTimestamp) || forceDelete) { Review comment: yes, the requirement is just like it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534746030 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,12 +192,9 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: no ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534746030 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,12 +192,9 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: no, can you check another method ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,12 +192,9 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: no, can you check another method? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534746030 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -192,12 +192,9 @@ public static void emptyTrash(String tablePath) { } /** - * This will tell whether the trash retention time has expired or not - * - * @param fileTimestamp - * @return + * whether trash data inside of .Trash folder is time out Review comment: no, can you check another method? and this method is private and only used by trash util to process trash folder ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534745677 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1044,7 +1045,7 @@ public static void cleanSegments(CarbonTable table, Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); - if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { + if (TrashUtil.isTrashDataTimeout(fileTimestamp) || forceDelete) { Review comment: yes, the requirement just like it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534771387 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CleanFilesUtil, TrashUtil} + +/** + * This object will manage the following data. + * 1. .Trash folder + * 2. stale segments without metadata + * 3. expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + */ +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. clean .Trash folder + * 2. clean stale segments without metadata + * 3. clean expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param carbonTable : CarbonTable Object + * @param partitionSpecs : Hive Partitions details + */ + def cleanGarbageData( + carbonTable: CarbonTable, + force: Boolean = false, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(absoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: clean trash folder + cleanTrashFolder(carbonTable, force) Review comment: `checkAndCleanTrashFolder` As we dont clean always. Function name give a feeling that we are cleaning trash folder here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534784870 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CleanFilesUtil, TrashUtil} + +/** + * This object will manage the following data. + * 1. .Trash folder + * 2. stale segments without metadata + * 3. expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + */ +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. clean .Trash folder + * 2. clean stale segments without metadata + * 3. clean expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param carbonTable : CarbonTable Object + * @param partitionSpecs : Hive Partitions details + */ + def cleanGarbageData( + carbonTable: CarbonTable, + force: Boolean = false, + partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + try { + val errorMsg = "Clean files request is failed for " + + s"${ carbonTable.getQualifiedName }" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + carbonCleanFilesLock = CarbonLockUtil.getLockObject(absoluteTableIdentifier, + LockUsage.CLEAN_FILES_LOCK, errorMsg) + // step 1: clean trash folder + cleanTrashFolder(carbonTable, force) + // step 2: clean stale segments which are not exists in metadata + cleanStaleSegments(carbonTable) Review comment: `moveStaleSegmentsToTrash` is the better name for this function ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534825881 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala ########## @@ -18,39 +18,31 @@ package org.apache.spark.sql.execution.command.mutation import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.{DataCommand, TruncateTableCommand} -import org.apache.spark.sql.execution.command.management.CarbonCleanFilesCommand -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.execution.command.{Checker, DataCommand, TruncateTableCommand} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.statusmanager.SegmentStatusManager case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val dbName = CarbonEnv.getDatabaseName(child.tableName.database)(sparkSession) - val tableName = child.tableName.table - setAuditTable(dbName, tableName) - val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - if (relation == null) { - throw new NoSuchTableException(dbName, tableName) - } - if (null == relation.carbonTable) { - LOGGER.error(s"Truncate table failed. table not found: $dbName.$child.tableName.table") - throw new NoSuchTableException(dbName, child.tableName.table) + Checker.validateTableExists(child.tableName.database, child.tableName.table, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable( + child.tableName.database, child.tableName.table)(sparkSession) + setAuditTable(carbonTable) + if (!carbonTable.isTransactionalTable) { + LOGGER.error(s"Unsupported truncate non-transactional table") + throw new MalformedCarbonCommandException( + "Unsupported truncate non-transactional table") } if (child.partitionSpec.isDefined) { throw new MalformedCarbonCommandException( "Unsupported truncate table with specified partition") } - CarbonCleanFilesCommand( - databaseNameOp = Option(dbName), - tableName = Option(tableName), - None, - truncateTable = true - ).run(sparkSession) + Review comment: I didn't see this command in any document. If possible please update it. I think this will just empty the table by removing all the data (success, fail, stale) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534852698 ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) { } - /** - * Handling of the clean up of old carbondata files, index files , delete delta, - * update status files. - * @param table clean up will be handled on this table. - * @param forceDelete if true then max query execution timeout will not be considered. - */ - public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException { Review comment: I didn't see delta file cleanup from the trashManager, will there be any impact because of this ? we have to test with stale delta files once before removing I think. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#issuecomment-737734503 @QiangCai : I finished review, have you tested with some stale delta files ? Now I think it won't be cleaned up forever (when segment is not stale but only delta files are stale) as code is removed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534863821 ########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -208,6 +205,14 @@ public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) { return difference > retentionMilliSeconds; } + /** + * whether trash data outside of .Trash folder is time out + */ + public static boolean isTrashDataTimeout(long fileTimestamp) { + return isTrashRetentionTimeoutExceeded(fileTimestamp) && Review comment: ok ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534865268 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java ########## @@ -146,8 +146,8 @@ public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable) * in the metadata folder and is not present in the table status file is considered as a * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file */ - private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> staleSegmentFiles, - List<String> redundantSegmentFile) { + private static void collectStaleSegmentFiles(CarbonTable carbonTable, Review comment: ok, doesn't make much difference ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
akashrn5 commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534930213 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ########## @@ -90,7 +90,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge)) { val lastSegment = sortedSegments.get(sortedSegments.size() - 1) - deletePartialLoadsInCompaction() Review comment: if you are removing this, how are you handling the case of segment number same in next compaction retry as we dont have any entry for table status file during compaction. ########## File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.trash + +import scala.collection.JavaConverters._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{CarbonProperties, CleanFilesUtil, TrashUtil} + +/** + * This object will manage the following data. + * 1. .Trash folder + * 2. stale segments without metadata + * 3. expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + */ +object DataTrashManager { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * clean garbage data + * 1. clean .Trash folder + * 2. clean stale segments without metadata + * 3. clean expired segments (MARKED_FOR_DELETE, Compacted, In Progress) + * + * @param carbonTable : CarbonTable Object + * @param partitionSpecs : Hive Partitions details Review comment: please remove @params, name suggests what it is clearly ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) { } - /** - * Handling of the clean up of old carbondata files, index files , delete delta, - * update status files. - * @param table clean up will be handled on this table. - * @param forceDelete if true then max query execution timeout will not be considered. - */ - public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException { - - SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); - - LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(table.getMetadataPath()); - - SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table); - SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails(); - // hold all the segments updated so that wen can check the delta files in them, ne need to - // check the others. - Set<String> updatedSegments = new HashSet<>(); - for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) { - updatedSegments.add(updateDetails.getSegmentName()); - } - - String validUpdateStatusFile = ""; - - boolean isAbortedFile = true; - - boolean isInvalidFile = false; - - // take the update status file name from 0th segment. - validUpdateStatusFile = ssm.getUpdateStatusFileName(details); - // scan through each segment. - for (LoadMetadataDetails segment : details) { - // if this segment is valid then only we will go for delta file deletion. - // if the segment is mark for delete or compacted then any way it will get deleted. - if (segment.getSegmentStatus() == SegmentStatus.SUCCESS - || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - // when there is no update operations done on table, then no need to go ahead. So - // just check the update delta start timestamp and proceed if not empty - if (!segment.getUpdateDeltaStartTimestamp().isEmpty() - || updatedSegments.contains(segment.getLoadName())) { - // take the list of files from this segment. - String segmentPath = CarbonTablePath.getSegmentPath( - table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName()); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath); - CarbonFile[] allSegmentFiles = segDir.listFiles(); - - // now handle all the delete delta files which needs to be deleted. - // there are 2 cases here . - // 1. if the block is marked as compacted then the corresponding delta files - // can be deleted if query exec timeout is done. - // 2. if the block is in success state then also there can be delete - // delta compaction happened and old files can be deleted. - - SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata(); - for (SegmentUpdateDetails block : updateDetails) { - CarbonFile[] completeListOfDeleteDeltaFiles; - CarbonFile[] invalidDeleteDeltaFiles; - - if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) { - continue; - } - - // aborted scenario. - invalidDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, false, - allSegmentFiles, isAbortedFile); - for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { - boolean doForceDelete = true; - compareTimestampsAndDelete(invalidFile, doForceDelete, false); - } - - // case 1 - if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { - completeListOfDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, true, - allSegmentFiles, isInvalidFile); - for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); - } - - } else { - invalidDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(block, false, - allSegmentFiles, isInvalidFile); - for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); - } - } - } - } - // handle cleanup of merge index files and data files after small files merge happened for - // SI table - cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment); - } - } - - // delete the update table status files which are old. - if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) { - - final String updateStatusTimestamp = validUpdateStatusFile - .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1); - - String tablePath = table.getAbsoluteTableIdentifier().getTablePath(); - CarbonFile metaFolder = FileFactory.getCarbonFile( - CarbonTablePath.getMetadataPath(tablePath)); - - CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) { - // CHECK if this is valid or not. - // we only send invalid ones to delete. - return !file.getName().endsWith(updateStatusTimestamp); - } - return false; - } - }); - - for (CarbonFile invalidFile : invalidUpdateStatusFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, true); - } - } - } - - /** - * this is the clean up added specifically for SI table, because after we merge the data files - * inside the secondary index table, we need to delete the stale carbondata files. - * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD - */ - private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table, Review comment: how is this scenario handled after removing? ########## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ########## @@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) { } - /** - * Handling of the clean up of old carbondata files, index files , delete delta, - * update status files. - * @param table clean up will be handled on this table. - * @param forceDelete if true then max query execution timeout will not be considered. - */ - public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException { Review comment: agree with @ajantha-bhat , please test and also this method used to take care of the aborted scenario, now how its being taken care? ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala ########## @@ -210,12 +210,6 @@ private[sql] case class DropIndexCommand( logError("Table metadata unlocking is unsuccessful, index table may be in stale state") } } - // in case if the the physical folders still exists for the index table - // but the carbon and hive info for the index table is removed, - // DROP INDEX IF EXISTS should clean up those physical directories - if (ifExistsSet && carbonTable.isEmpty) { Review comment: i think we should keep this change, from the comment its clear that there will be such scenarios and we have observed many times ########## File path: integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala ########## @@ -34,5 +34,6 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSessi * @param carbonTable * @param sparkSession */ -case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession: SparkSession) - extends Event with CleanFilesEventInfo +case class CleanFilesPostEvent(carbonTable: CarbonTable, Review comment: move carbontable also to next line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on a change in pull request #4013: URL: https://github.com/apache/carbondata/pull/4013#discussion_r534982270 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala ########## @@ -18,39 +18,31 @@ package org.apache.spark.sql.execution.command.mutation import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.{DataCommand, TruncateTableCommand} -import org.apache.spark.sql.execution.command.management.CarbonCleanFilesCommand -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.execution.command.{Checker, DataCommand, TruncateTableCommand} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.statusmanager.SegmentStatusManager case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val dbName = CarbonEnv.getDatabaseName(child.tableName.database)(sparkSession) - val tableName = child.tableName.table - setAuditTable(dbName, tableName) - val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - if (relation == null) { - throw new NoSuchTableException(dbName, tableName) - } - if (null == relation.carbonTable) { - LOGGER.error(s"Truncate table failed. table not found: $dbName.$child.tableName.table") - throw new NoSuchTableException(dbName, child.tableName.table) + Checker.validateTableExists(child.tableName.database, child.tableName.table, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable( + child.tableName.database, child.tableName.table)(sparkSession) + setAuditTable(carbonTable) + if (!carbonTable.isTransactionalTable) { + LOGGER.error(s"Unsupported truncate non-transactional table") + throw new MalformedCarbonCommandException( + "Unsupported truncate non-transactional table") } if (child.partitionSpec.isDefined) { throw new MalformedCarbonCommandException( "Unsupported truncate table with specified partition") } - CarbonCleanFilesCommand( - databaseNameOp = Option(dbName), - tableName = Option(tableName), - None, - truncateTable = true - ).run(sparkSession) + Review comment: let clean files to remove data. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |