vikramahuja1001 opened a new pull request #3917: URL: https://github.com/apache/carbondata/pull/3917 ### Why is this PR needed? ### What changes were proposed in this PR? ### Does this PR introduce any user interface change? - No - Yes. (please explain the change and update document) ### Is any new testcase added? - No - Yes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689476467 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2276/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689476907 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4015/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689551682 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4016/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689552007 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2277/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
Indhumathi27 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r485655081 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,19 +1107,39 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (entry.getKey().endsWith(".carbondata")) { Review comment: Can Use CarbonCommonConstants FACT_FILE_EXT ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,19 +1107,39 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName) Review comment: ```suggestion SegmentUpdateStatusManager updateStatusManager, String tableName, String databaseName) ``` ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, Review comment: please check and format the code ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,19 +1107,39 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (entry.getKey().endsWith(".carbondata")) { + CarbonFile carbonFileToCopy = CarbonUtil.getOrCreateTrashFolder(DatabaseName, tableName); + FileUtils.copyFileToDirectory(new File(entry.getKey()), new File(carbonFileToCopy + .getAbsolutePath())); + } FileFactory.deleteFile(entry.getKey()); for (String file : entry.getValue()) { String[] deltaFilePaths = updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo()); for (String deltaFilePath : deltaFilePaths) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (deltaFilePath.endsWith(".carbondata")) { Review comment: Same as previous comment ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3273,6 +3290,40 @@ public static CarbonFile createTempFolderForIndexServer(String queryId) } } + /** + * Below method will either create the trash folder based on table name, or if the + * folder already exists, will return that folder. + */ + public static CarbonFile getOrCreateTrashFolder(String databaseName, String tableName) + throws IOException { + String pathOfTrashFolder = getTrashFolderPath(); + if (databaseName.length() == 0 && tableName.length() == 0) { + if (!FileFactory.isFileExist(pathOfTrashFolder)) { + // Create the new index server temp directory if it does not exist + LOGGER.info("Creating Trash folder for Deleted segments at:" + pathOfTrashFolder); + FileFactory + .createDirectoryAndSetPermission(pathOfTrashFolder, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } + return null; + } + String filePath = pathOfTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + databaseName + + CarbonCommonConstants.UNDERSCORE + tableName; + CarbonFile file = FileFactory.getCarbonFile(filePath); + if (!FileFactory.isFileExist(filePath)) { Review comment: ```suggestion if (!file.exists) { ``` ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) + } + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) + case _ => + } + } + } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList) + } + + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + + /** + * delete partition folders recursively + * + * @param carbonTable + * @param partitionSpecList + */ + def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable, + partitionSpecList: List[PartitionSpec]): Unit = { + if (carbonTable != null && carbonTable.isHivePartitionTable) { + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetadataPath) + + val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath) + + // list all files from table path + val listOfDefaultPartFilesIterator = carbonFile.listFiles(true) + loadMetadataDetails.foreach { metadataDetail => + if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) && + metadataDetail.getSegmentFile == null) { + val loadStartTime: Long = metadataDetail.getLoadStartTime + // delete all files of @loadStartTime from table path + cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime) + partitionSpecList.foreach { + partitionSpec => + val partitionLocation = partitionSpec.getLocation + // For partition folder outside the tablePath + if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) { + val partitionCarbonFile = FileFactory + .getCarbonFile(partitionLocation.toString) + // list all files from partitionLocation + val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true) + // delete all files of @loadStartTime from externalPath + cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime) + } + } + } + } + } + } + + /** + * + * @param carbonFiles + * @param timestamp + */ + private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile], + timestamp: Long): Unit = { + carbonFiles.asScala.foreach { + carbonFile => + val filePath = carbonFile.getPath + val fileName = carbonFile.getName + if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) { + // delete the file + FileFactory.deleteFile(filePath) + } + } + } + + /** + * The in-progress segments which are in stale state will be marked as deleted + * when driver is initializing. + * + * @param databaseLocation + * @param dbName + */ + def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = { + val loaderDriver = CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER, + CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean + if (!loaderDriver) { + return + } + try { + if (FileFactory.isFileExist(databaseLocation)) { + val file = FileFactory.getCarbonFile(databaseLocation) Review comment: Can move this line up. and replace file.exists ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689682451 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2284/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689683149 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4022/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689856094 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4028/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689858430 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2289/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
QiangCai commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689946875 please describe the detail of the strategy. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-689983615 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-690056910 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4032/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-690060524 Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2293/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
BrooksLI commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r486292381 ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,19 +1107,39 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (entry.getKey().endsWith(".carbondata")) { Review comment: Can we extract the method rather than duplicate it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
marchpure commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r488464250 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { Review comment: forceTableClean is too violence, please delete it. ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) + } + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) + case _ => + } + } + } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList) + } + + if (carbonCleanFilesLock != null) { Review comment: If cleanUpPartitionFoldersRecursively throw some exception. The clean file lock won't be released. ########## File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ########## @@ -192,11 +204,33 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, } private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad, - boolean isForceDelete) { + boolean isForceDelete, AbsoluteTableIdentifier absoluteTableIdentifier) { // Check if the segment is added externally and path is set then do not delete it if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() - || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null + || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus + .INSERT_IN_PROGRESS == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null Review comment: For INSERT_IN_PROGRESS segment depend on timeout threshold, not lock in your logic, once 'INSERT_IN_PROGRESS' more than 1 hour, the segment will be deleted。 ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ########## @@ -108,12 +112,51 @@ case class CarbonCleanFilesCommand( Seq.empty } + def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = { + val tableStatusLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK) + val carbonLoadModel = new CarbonLoadModel + try { + if (tableStatusLock.lockWithRetries()) { + val tableStatusFilePath = CarbonTablePath + .getTableStatusFilePath(carbonTable.getTablePath) + val loadMetaDataDetails = SegmentStatusManager + .readTableStatusFile(tableStatusFilePath).filter(details => details.getSegmentStatus == + SegmentStatus.SUCCESS || details.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) + .sortWith(_.getLoadName < _.getLoadName) + carbonLoadModel.setLoadMetadataDetails(loadMetaDataDetails.toList.asJava) + } else { + throw new ConcurrentOperationException(carbonTable.getDatabaseName, + carbonTable.getTableName, "table status read", "clean files command") + } + } finally { + tableStatusLock.unlock() + } + val loadMetaDataDetails = carbonLoadModel.getLoadMetadataDetails.asScala + val segmentFileList = loadMetaDataDetails.map(f => CarbonTablePath.getSegmentFilesLocation( Review comment: Add code: if (loadMetaDataDetails.isEmpty) return; ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) Review comment: truncateTable is too violence, please delete it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-701970889 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2541/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-701971400 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4288/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-706210381 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4350/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-706213323 Build Failed with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2600/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |