vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r515789003 ########## File path: docs/cleanfiles.md ########## @@ -0,0 +1,78 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +## CLEAN FILES + +Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present) + segments from the store. + + Clean Files Command + ``` + CLEAN FILES FOR TABLE TABLE_NAME + ``` + + +### TRASH FOLDER + + Carbondata supports a Trash Folder which is used as a redundant folder where all the unnecessary files and folders are moved to during clean files operation. Review comment: done ########## File path: docs/cleanfiles.md ########## @@ -0,0 +1,78 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +## CLEAN FILES + +Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present) + segments from the store. + + Clean Files Command + ``` + CLEAN FILES FOR TABLE TABLE_NAME + ``` + + +### TRASH FOLDER + + Carbondata supports a Trash Folder which is used as a redundant folder where all the unnecessary files and folders are moved to during clean files operation. + This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp + subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp. By default all the timestamp sub-directory have an expiration + time of (3 days since that timestamp) and it can be configured by the user using the following carbon property + ``` + carbon.trash.expiration.time = "Number of days" + ``` + Once the timestamp subdirectory is expired as per the configured expiration day value, the subdirectory is deleted from the trash folder in the subsequent clean files command. + + + + +### DRY RUN + Support for dry run is provided before the actual clean files operation. This dry run operation will list down all the segments which are going to be manipulated during + the clean files operation. The dry run result will show the current location of the segment(it can be in FACT folder, Partition folder or trash folder) and where that segment Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
ajantha-bhat commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-720242738 retest this please ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
CarbonDataQA1 commented on pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#issuecomment-720280224 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516787255 ########## File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.File; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.CarbonFileException; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.io.FileUtils; + +import org.apache.log4j.Logger; + +/** + * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and + * remove data from the trash. + */ +public final class TrashUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + /** + * The below method copies the complete a file to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param pathOfFileToCopy the files which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + if (new File(pathOfFileToCopy).exists()) { + FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath)); + LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: " + + trashFolderPath); + } + } catch (IOException e) { + LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e); + } + } + + /** + * The below method copies the complete segment folder to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param path the folder which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath)); + LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" + + " successfully"); + } catch (IOException e) { + LOGGER.error("Unable to create the trash folder and copy data to it", e); + } + } + + /** + * The below method deletes timestamp subdirectories in the trash folder which have expired as + * per the user defined expiration time + */ + public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp) + throws IOException { + String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath); + // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp. + if (FileFactory.isFileExist(pathOfTrashFolder)) { + try { + List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder); + for (CarbonFile carbonFile : carbonFileList) { + String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants + .FILE_SEPARATOR); + Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime()); + Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]); + // If the timeStamp at which the timeStamp subdirectory has expired as per the user + // defined value, delete the complete timeStamp subdirectory + if (givenTime + timeStamp < currentTime) { + deleteDataFromTrashFolderByFile(carbonFile); + } else { + LOGGER.info("Timestamp folder not expired: " + carbonFile.getAbsolutePath()); Review comment: Cannot do it this way because then it will not have the timestamp subdirectory, segment id and partition folder if any. Only the absolutepath will have it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516788672 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete, + List<PartitionSpec> specs) { + LoadMetadataDetails[] currentDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + physicalFactAndMeasureMetadataDeletion(carbonTable, + currentDetails, + isForceDelete, + specs, + currentDetails); + if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { + physicalFactAndMeasureMetadataDeletion(carbonTable, + newAddedLoadHistoryList, + isForceDelete, + specs, + currentDetails); + } + } + + /** + * Delete the invalid data physically from table. + * @param carbonTable table + * @param loadDetails Load details which need clean up + * @param isForceDelete is Force delete requested by user + * @param specs Partition specs + * @param currLoadDetails Current table status load details which are required for update manager. + */ + private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs, + LoadMetadataDetails[] currLoadDetails) { + List<TableIndex> indexes = new ArrayList<>(); + try { + for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) { + if (index.getIndexSchema().isIndex()) { + indexes.add(index); + } + } + } catch (IOException e) { + LOGGER.warn(String.format( + "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.", + carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), + carbonTable.getAbsoluteTableIdentifier().getTableName())); + } + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, currLoadDetails); + for (final LoadMetadataDetails oneLoad : loadDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + try { + if (oneLoad.getSegmentFile() != null) { + SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(), + new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs, + updateStatusManager); + } else { + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path)) { + CarbonFile file = FileFactory.getCarbonFile(path); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516789476 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete, + List<PartitionSpec> specs) { + LoadMetadataDetails[] currentDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + physicalFactAndMeasureMetadataDeletion(carbonTable, + currentDetails, + isForceDelete, + specs, + currentDetails); + if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { + physicalFactAndMeasureMetadataDeletion(carbonTable, + newAddedLoadHistoryList, + isForceDelete, + specs, + currentDetails); + } + } + + /** + * Delete the invalid data physically from table. + * @param carbonTable table + * @param loadDetails Load details which need clean up + * @param isForceDelete is Force delete requested by user + * @param specs Partition specs + * @param currLoadDetails Current table status load details which are required for update manager. + */ + private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs, + LoadMetadataDetails[] currLoadDetails) { + List<TableIndex> indexes = new ArrayList<>(); + try { + for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) { + if (index.getIndexSchema().isIndex()) { + indexes.add(index); + } + } + } catch (IOException e) { + LOGGER.warn(String.format( + "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.", + carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), + carbonTable.getAbsoluteTableIdentifier().getTableName())); + } + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, currLoadDetails); + for (final LoadMetadataDetails oneLoad : loadDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + try { + if (oneLoad.getSegmentFile() != null) { + SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(), + new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs, + updateStatusManager); + } else { + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path)) { + CarbonFile file = FileFactory.getCarbonFile(path); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } + } + } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn("Unable to delete the folder as per delete command " + file + .getAbsolutePath()); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516789597 ########## File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.File; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.CarbonFileException; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.io.FileUtils; + +import org.apache.log4j.Logger; + +/** + * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and + * remove data from the trash. + */ +public final class TrashUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + /** + * The below method copies the complete a file to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param pathOfFileToCopy the files which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + if (new File(pathOfFileToCopy).exists()) { + FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath)); + LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: " + + trashFolderPath); + } + } catch (IOException e) { + LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e); + } + } + + /** + * The below method copies the complete segment folder to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param path the folder which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath)); + LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" + + " successfully"); + } catch (IOException e) { + LOGGER.error("Unable to create the trash folder and copy data to it", e); + } + } + + /** + * The below method deletes timestamp subdirectories in the trash folder which have expired as + * per the user defined expiration time + */ + public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp) + throws IOException { + String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath); + // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp. + if (FileFactory.isFileExist(pathOfTrashFolder)) { + try { + List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder); + for (CarbonFile carbonFile : carbonFileList) { + String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants + .FILE_SEPARATOR); + Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime()); + Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516789762 ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -1427,6 +1427,23 @@ private CarbonCommonConstants() { public static final String BITSET_PIPE_LINE_DEFAULT = "true"; + /** + * this is the user defined time(in days), when a specific timestamp subdirectory in + * trash folder will expire + */ + @CarbonProperty + public static final String CARBON_TRASH_EXPIRATION_DAYS = "carbon.trash.expiration.days"; + + /** + * Default expiration time of trash folder is 3 days. + */ + public static final String CARBON_TRASH_EXPIRATION_DAYS_DEFAULT = "3"; Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516789854 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -2116,6 +2087,28 @@ public int getMaxSIRepairLimit(String dbName, String tableName) { return Math.abs(Integer.parseInt(thresholdValue)); } + /** + * The below method returns the microseconds after which the trash folder will expire + */ + public long getTrashFolderExpirationTime() { + String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS, + CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT); + Integer result = 0; + try { + result = Integer.parseInt(configuredValue); + if (result < 0) { + LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value"); + result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT); + } + } catch (NumberFormatException e) { + LOGGER.error("Invalid value configured for CarbonCommonConstants" + + ".CARBON_TRASH_EXPIRATION_DAYS, considering the default value"); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ########## @@ -2116,6 +2087,28 @@ public int getMaxSIRepairLimit(String dbName, String tableName) { return Math.abs(Integer.parseInt(thresholdValue)); } + /** + * The below method returns the microseconds after which the trash folder will expire + */ + public long getTrashFolderExpirationTime() { + String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS, + CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT); + Integer result = 0; + try { + result = Integer.parseInt(configuredValue); + if (result < 0) { + LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value"); + result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT); + } + } catch (NumberFormatException e) { + LOGGER.error("Invalid value configured for CarbonCommonConstants" + + ".CARBON_TRASH_EXPIRATION_DAYS, considering the default value"); + result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT); + } + Long microSecondsInADay = Long.valueOf(TimeUnit.DAYS.toMillis(1)); + return result * microSecondsInADay; Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3441,4 +3443,33 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + /** + * The below method tries to get the segment lock for the given segment. + */ + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted; + try { + if (segmentLock.lockWithRetries(CarbonCommonConstants + .NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT, CarbonCommonConstants + .MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT)) { + LOGGER.info("Info: Acquired segment lock on segment: " + oneLoad.getLoadName() + ". It " + + "can be deleted as load is not in progress"); + canBeDeleted = true; + } else { + LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName()); + canBeDeleted = false; + } + } finally { + if (segmentLock.unlock()) { + LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released"); + } else { + LOGGER.error("Error: Unable to release segment lock on : " + oneLoad.getLoadName()); Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516790181 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete, + List<PartitionSpec> specs) { + LoadMetadataDetails[] currentDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + physicalFactAndMeasureMetadataDeletion(carbonTable, + currentDetails, + isForceDelete, + specs, + currentDetails); + if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { + physicalFactAndMeasureMetadataDeletion(carbonTable, + newAddedLoadHistoryList, + isForceDelete, + specs, + currentDetails); + } + } + + /** + * Delete the invalid data physically from table. + * @param carbonTable table + * @param loadDetails Load details which need clean up + * @param isForceDelete is Force delete requested by user + * @param specs Partition specs + * @param currLoadDetails Current table status load details which are required for update manager. + */ + private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs, + LoadMetadataDetails[] currLoadDetails) { + List<TableIndex> indexes = new ArrayList<>(); + try { + for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) { + if (index.getIndexSchema().isIndex()) { + indexes.add(index); + } + } + } catch (IOException e) { + LOGGER.warn(String.format( + "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.", + carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), + carbonTable.getAbsoluteTableIdentifier().getTableName())); + } + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, currLoadDetails); + for (final LoadMetadataDetails oneLoad : loadDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + try { + if (oneLoad.getSegmentFile() != null) { + SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(), + new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs, + updateStatusManager); + } else { + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path)) { + CarbonFile file = FileFactory.getCarbonFile(path); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } + } + } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn("Unable to delete the folder as per delete command " + file + .getAbsolutePath()); + } + } + + } + } + List<Segment> segments = new ArrayList<>(1); + for (TableIndex index : indexes) { + segments.clear(); + segments.add(new Segment(oneLoad.getLoadName())); + index.deleteIndexData(segments); + } + } catch (Exception e) { + LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); + } + } + } + } + + private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad, + boolean isForceDelete) { + // Check if the segment is added externally and path is set then do not delete it + if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() + || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null + || oneLoad.getPath().equalsIgnoreCase("NA"))) { + if (isForceDelete) { + return true; + } + long deletionTime = oneLoad.getModificationOrDeletionTimestamp(); + return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); + } + return false; + } + + + /** + * returns segment path + * + * @param identifier + * @param oneLoad + * @return + */ + private static String getSegmentPath(AbsoluteTableIdentifier identifier, + LoadMetadataDetails oneLoad) { + String segmentId = oneLoad.getLoadName(); + return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); + } Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete, + List<PartitionSpec> specs) { + LoadMetadataDetails[] currentDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + physicalFactAndMeasureMetadataDeletion(carbonTable, + currentDetails, + isForceDelete, + specs, + currentDetails); + if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { + physicalFactAndMeasureMetadataDeletion(carbonTable, + newAddedLoadHistoryList, + isForceDelete, + specs, + currentDetails); + } + } + + /** + * Delete the invalid data physically from table. + * @param carbonTable table + * @param loadDetails Load details which need clean up + * @param isForceDelete is Force delete requested by user + * @param specs Partition specs + * @param currLoadDetails Current table status load details which are required for update manager. + */ + private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs, + LoadMetadataDetails[] currLoadDetails) { + List<TableIndex> indexes = new ArrayList<>(); + try { + for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) { + if (index.getIndexSchema().isIndex()) { + indexes.add(index); + } + } + } catch (IOException e) { + LOGGER.warn(String.format( + "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.", + carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), + carbonTable.getAbsoluteTableIdentifier().getTableName())); + } + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, currLoadDetails); + for (final LoadMetadataDetails oneLoad : loadDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + try { + if (oneLoad.getSegmentFile() != null) { + SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(), + new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs, + updateStatusManager); + } else { + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path)) { + CarbonFile file = FileFactory.getCarbonFile(path); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } + } + } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn("Unable to delete the folder as per delete command " + file + .getAbsolutePath()); + } + } + + } + } + List<Segment> segments = new ArrayList<>(1); + for (TableIndex index : indexes) { + segments.clear(); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -1427,6 +1427,23 @@ private CarbonCommonConstants() { public static final String BITSET_PIPE_LINE_DEFAULT = "true"; + /** + * this is the user defined time(in days), when a specific timestamp subdirectory in + * trash folder will expire + */ + @CarbonProperty + public static final String CARBON_TRASH_EXPIRATION_DAYS = "carbon.trash.expiration.days"; + + /** + * Default expiration time of trash folder is 3 days. + */ + public static final String CARBON_TRASH_EXPIRATION_DAYS_DEFAULT = "3"; + + /** + * Trash folder temp name + */ + public static final String CARBON_TRASH_FOLDER_NAME = ".Trash"; Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516791220 ########## File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.File; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.CarbonFileException; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.io.FileUtils; + +import org.apache.log4j.Logger; + +/** + * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and + * remove data from the trash. + */ +public final class TrashUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + /** + * The below method copies the complete a file to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param pathOfFileToCopy the files which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + if (new File(pathOfFileToCopy).exists()) { + FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath)); + LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: " + + trashFolderPath); + } + } catch (IOException e) { + LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e); + } + } + + /** + * The below method copies the complete segment folder to the trash folder. Provide necessary + * timestamp and the segment number in the suffixToAdd variable, so that the proper folder is + * created in the trash folder. + * + * @param carbonTablePath table path of the carbon table + * @param path the folder which are to be moved to the trash folder + * @param suffixToAdd timestamp, partition folder(if any) and segment number + * @return + */ + public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath, + String suffixToAdd) { + String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd; + try { + FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath)); + LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" + + " successfully"); + } catch (IOException e) { + LOGGER.error("Unable to create the trash folder and copy data to it", e); + } + } + + /** + * The below method deletes timestamp subdirectories in the trash folder which have expired as + * per the user defined expiration time + */ + public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516792560 ########## File path: docs/cleanfiles.md ########## @@ -0,0 +1,78 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +## CLEAN FILES + +Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516792757 ########## File path: docs/cleanfiles.md ########## @@ -0,0 +1,78 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +## CLEAN FILES + +Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present) + segments from the store. + + Clean Files Command + ``` + CLEAN FILES FOR TABLE TABLE_NAME + ``` + + +### TRASH FOLDER + + Carbondata supports a Trash Folder which is used as a redundant folder where all stale carbondata segments are moved to during clean files operation. + This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp + subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp. By default all the timestamp sub-directory have an expiration + time of (3 days since that timestamp) and it can be configured by the user using the following carbon property Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516793022 ########## File path: docs/cleanfiles.md ########## @@ -0,0 +1,78 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + + +## CLEAN FILES + +Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present) + segments from the store. + + Clean Files Command + ``` + CLEAN FILES FOR TABLE TABLE_NAME + ``` + + +### TRASH FOLDER + + Carbondata supports a Trash Folder which is used as a redundant folder where all stale carbondata segments are moved to during clean files operation. + This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp + subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp. By default all the timestamp sub-directory have an expiration + time of (3 days since that timestamp) and it can be configured by the user using the following carbon property + ``` + carbon.trash.expiration.time = "Number of days" + ``` + Once the timestamp subdirectory is expired as per the configured expiration day value, the subdirectory is deleted from the trash folder in the subsequent clean files command. + + + + +### DRY RUN + Support for dry run is provided before the actual clean files operation. This dry run operation will list down all the segments which are going to be manipulated during + the clean files operation. The dry run result will show the current location of the segment(it can be in FACT folder, Partition folder or trash folder), where that segment + will be moved(to the trash folder or deleted from store) and the number of days left before it expires once the actual operation will be called. + + + ``` + CLEAN FILES FOR TABLE TABLE_NAME options('dry_run'='true') + ``` + +### FORCE DELETE TRASH +The force option with clean files command deletes all the files and folders from the trash folder. Review comment: no other difference, it just force clean trash folder ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516793523 ########## File path: docs/dml-of-carbondata.md ########## @@ -27,6 +27,7 @@ CarbonData DML statements are documented here,which includes: * [UPDATE AND DELETE](#update-and-delete) * [COMPACTION](#compaction) * [SEGMENT MANAGEMENT](./segment-management-on-carbondata.md) +* [CLEAN FILES](./cleanfiles.md) Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516793794 ########## File path: docs/dml-of-carbondata.md ########## @@ -560,5 +561,4 @@ CarbonData DML statements are documented here,which includes: Clean the segments which are compacted: ``` - CLEAN FILES FOR TABLE carbon_table - ``` Review comment: added back ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516795339 ########## File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.index.IndexStoreManager; +import org.apache.carbondata.core.index.Segment; +import org.apache.carbondata.core.index.TableIndex; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * Mantains the code used in clean files command, to delete the load folders and move the data + * to trash folder + */ +public final class CleanUtil { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516796678 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.sql.Timestamp +import java.util +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil} +import org.apache.carbondata.processing.loading.TableProcessingOperations + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableClean <true> and clean garbage segment + * (MARKED_FOR_DELETE state) if forceTableClean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) + } + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) + case _ => + } + } + } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList) + } + + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + + /** + * delete partition folders recursively + * + * @param carbonTable + * @param partitionSpecList + */ + def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable, + partitionSpecList: List[PartitionSpec]): Unit = { + if (carbonTable != null && carbonTable.isHivePartitionTable) { + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetadataPath) + + val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath) + + // list all files from table path + val listOfDefaultPartFilesIterator = carbonFile.listFiles(true) + loadMetadataDetails.foreach { metadataDetail => + if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) && + metadataDetail.getSegmentFile == null) { + val loadStartTime: Long = metadataDetail.getLoadStartTime + // delete all files of @loadStartTime from table path + cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime) + partitionSpecList.foreach { + partitionSpec => + val partitionLocation = partitionSpec.getLocation + // For partition folder outside the tablePath + if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) { + val partitionCarbonFile = FileFactory + .getCarbonFile(partitionLocation.toString) + // list all files from partitionLocation + val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true) + // delete all files of @loadStartTime from externalPath + cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime) + } + } + } + } + } + } + + /** + * Compare CarbonFile Timestamp and delete files. + * + * @param carbonFiles + * @param timestamp + */ + private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile], Review comment: this just directly delete by timestamp ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516797523 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.sql.Timestamp +import java.util +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil} +import org.apache.carbondata.processing.loading.TableProcessingOperations + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableClean <true> and clean garbage segment + * (MARKED_FOR_DELETE state) if forceTableClean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) + } + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) + case _ => + } + } + } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList) + } + + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + + /** + * delete partition folders recursively + * + * @param carbonTable + * @param partitionSpecList + */ + def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable, + partitionSpecList: List[PartitionSpec]): Unit = { + if (carbonTable != null && carbonTable.isHivePartitionTable) { + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetadataPath) + + val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath) + + // list all files from table path + val listOfDefaultPartFilesIterator = carbonFile.listFiles(true) + loadMetadataDetails.foreach { metadataDetail => + if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) && + metadataDetail.getSegmentFile == null) { + val loadStartTime: Long = metadataDetail.getLoadStartTime + // delete all files of @loadStartTime from table path + cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime) + partitionSpecList.foreach { + partitionSpec => + val partitionLocation = partitionSpec.getLocation + // For partition folder outside the tablePath + if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) { + val partitionCarbonFile = FileFactory + .getCarbonFile(partitionLocation.toString) + // list all files from partitionLocation + val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true) + // delete all files of @loadStartTime from externalPath + cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime) + } + } + } + } + } + } + + /** + * Compare CarbonFile Timestamp and delete files. + * + * @param carbonFiles + * @param timestamp + */ + private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile], + timestamp: Long): Unit = { + carbonFiles.asScala.foreach { carbonFile => + val filePath = carbonFile.getPath + val fileName = carbonFile.getName + if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) { + FileFactory.deleteFile(filePath) + } + } + } + + /** + * The in-progress segments which are in stale state will be marked as deleted + * when driver is initializing. + * + * @param databaseLocation + * @param dbName + */ + def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = { + val loaderDriver = CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER, + CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean + if (!loaderDriver) { + return + } + try { + if (FileFactory.isFileExist(databaseLocation)) { + val file = FileFactory.getCarbonFile(databaseLocation) + if (file.isDirectory) { + val tableFolders = file.listFiles() + tableFolders.foreach { tableFolder => + if (tableFolder.isDirectory) { + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + + tableFolder.getName + val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName) + val tableStatusFile = + CarbonTablePath.getTableStatusFilePath(tablePath) + if (FileFactory.isFileExist(tableStatusFile)) { + try { + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null) + } catch { + case _: Exception => + LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName") + } + } + } + } + } + } + } catch { + case s: java.io.FileNotFoundException => + LOGGER.error(s) + } + } + + /** + * The below method deletes all the files and folders in the trash folders of all carbon tables + * in all databases + */ + def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = { + try { + val databases = sparkSession.sessionState.catalog.listDatabases() + databases.foreach(dbName => { + val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession) + if (FileFactory.isFileExist(databaseLocation)) { + val file = FileFactory.getCarbonFile(databaseLocation) + if (file.isDirectory) { + val tableFolders = file.listFiles() + tableFolders.foreach { tableFolder => + if (tableFolder.isDirectory) { + val tablePath = databaseLocation + + CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName + TrashUtil.deleteAllDataFromTrashFolder(tablePath) + } + } + } + } + }) + } catch { + case e: Throwable => + // catch all exceptions to avoid failure + LOGGER.error("Failed to clear trash folder of all tables", e) + } + } + /** + * The below method deletes all the files and folders in trash folder in a carbontable and all + * it's index tables + */ + def deleteDataFromTrashFolder(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = { + TrashUtil.deleteAllDataFromTrashFolder(carbonTable.getTablePath) + // check for index tables + val indexTables = CarbonIndexUtil + .getIndexCarbonTables(carbonTable, sparkSession) + indexTables.foreach { indexTable => + TrashUtil.deleteAllDataFromTrashFolder(indexTable.getTablePath) + } + } + + /** + * The below method deletes the timestamp sub directory in the trash folder based on the + * expiration day + */ + def deleteDataFromTrashFolderByTimeStamp(carbonTable: CarbonTable, sparkSession: Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
In reply to this post by GitBox
vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r516798306 ########## File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cleanfiles + +import java.sql.Timestamp +import java.util +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil} +import org.apache.carbondata.processing.loading.TableProcessingOperations + +object CleanFilesUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * The method deletes all data if forceTableClean <true> and clean garbage segment + * (MARKED_FOR_DELETE state) if forceTableClean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ + def cleanFiles( + dbName: String, + tableName: String, + tablePath: String, + carbonTable: CarbonTable, + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[PartitionSpec]] = None, + truncateTable: Boolean = false): Unit = { + var carbonCleanFilesLock: ICarbonLock = null + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName) + } else { + carbonTable.getAbsoluteTableIdentifier + } + try { + val errorMsg = "Clean files request is failed for " + + s"$dbName.$tableName" + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + // in case of force clean the lock is not required + if (forceTableClean) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath)) + } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + if (truncateTable) { + SegmentStatusManager.truncateTable(carbonTable) + } + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + SegmentFileStore.cleanSegments( + carbonTable, + currentTablePartitions.map(_.asJava).orNull, + true) + case _ => + } + } + } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList) + } + + if (carbonCleanFilesLock != null) { + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) + } + } + } + + + /** + * delete partition folders recursively + * + * @param carbonTable + * @param partitionSpecList + */ + def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable, + partitionSpecList: List[PartitionSpec]): Unit = { + if (carbonTable != null && carbonTable.isHivePartitionTable) { + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetadataPath) + + val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath) + + // list all files from table path + val listOfDefaultPartFilesIterator = carbonFile.listFiles(true) + loadMetadataDetails.foreach { metadataDetail => + if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) && + metadataDetail.getSegmentFile == null) { + val loadStartTime: Long = metadataDetail.getLoadStartTime + // delete all files of @loadStartTime from table path + cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime) + partitionSpecList.foreach { + partitionSpec => + val partitionLocation = partitionSpec.getLocation + // For partition folder outside the tablePath + if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) { + val partitionCarbonFile = FileFactory + .getCarbonFile(partitionLocation.toString) + // list all files from partitionLocation + val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true) + // delete all files of @loadStartTime from externalPath + cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime) + } + } + } + } + } + } + + /** + * Compare CarbonFile Timestamp and delete files. + * + * @param carbonFiles + * @param timestamp + */ + private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile], + timestamp: Long): Unit = { + carbonFiles.asScala.foreach { carbonFile => + val filePath = carbonFile.getPath + val fileName = carbonFile.getName + if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) { + FileFactory.deleteFile(filePath) + } + } + } + + /** + * The in-progress segments which are in stale state will be marked as deleted + * when driver is initializing. + * + * @param databaseLocation + * @param dbName + */ + def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = { + val loaderDriver = CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER, + CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean + if (!loaderDriver) { + return + } + try { + if (FileFactory.isFileExist(databaseLocation)) { + val file = FileFactory.getCarbonFile(databaseLocation) + if (file.isDirectory) { + val tableFolders = file.listFiles() + tableFolders.foreach { tableFolder => + if (tableFolder.isDirectory) { + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + + tableFolder.getName + val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName) + val tableStatusFile = + CarbonTablePath.getTableStatusFilePath(tablePath) + if (FileFactory.isFileExist(tableStatusFile)) { + try { + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null) + } catch { + case _: Exception => + LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName") + } + } + } + } + } + } + } catch { + case s: java.io.FileNotFoundException => + LOGGER.error(s) + } + } + + /** + * The below method deletes all the files and folders in the trash folders of all carbon tables + * in all databases + */ + def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = { + try { + val databases = sparkSession.sessionState.catalog.listDatabases() + databases.foreach(dbName => { + val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession) + if (FileFactory.isFileExist(databaseLocation)) { + val file = FileFactory.getCarbonFile(databaseLocation) + if (file.isDirectory) { + val tableFolders = file.listFiles() + tableFolders.foreach { tableFolder => + if (tableFolder.isDirectory) { + val tablePath = databaseLocation + + CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName + TrashUtil.deleteAllDataFromTrashFolder(tablePath) + } + } + } + } + }) + } catch { + case e: Throwable => + // catch all exceptions to avoid failure + LOGGER.error("Failed to clear trash folder of all tables", e) + } + } + /** + * The below method deletes all the files and folders in trash folder in a carbontable and all + * it's index tables + */ + def deleteDataFromTrashFolder(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = { + TrashUtil.deleteAllDataFromTrashFolder(carbonTable.getTablePath) + // check for index tables + val indexTables = CarbonIndexUtil + .getIndexCarbonTables(carbonTable, sparkSession) + indexTables.foreach { indexTable => + TrashUtil.deleteAllDataFromTrashFolder(indexTable.getTablePath) + } + } + + /** + * The below method deletes the timestamp sub directory in the trash folder based on the + * expiration day + */ + def deleteDataFromTrashFolderByTimeStamp(carbonTable: CarbonTable, sparkSession: + SparkSession): Unit = { + val expirationDay = CarbonProperties.getInstance().getTrashFolderExpirationTime + TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(carbonTable.getTablePath, expirationDay) + // check for index tables + val indexTables = CarbonIndexUtil + .getIndexCarbonTables(carbonTable, sparkSession) + indexTables.foreach { indexTable => + TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(indexTable.getTablePath, expirationDay) + } + } + + /** + * Actual dry run operation. It will also do dry run for stale segments + */ + def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = { Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [hidden email] |
Free forum by Nabble | Edit this page |