[GitHub] [carbondata] vikramahuja1001 opened a new pull request #4005: [WIP] Trash Folder support in carbondata

classic Classic list List threaded Threaded
147 messages Options
123456 ... 8
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r527482978



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+  private static List<CarbonFile> filesInPartitionFolder = new ArrayList<CarbonFile>();
+
+  /**
+   *
+   * This method will clean the stale segments and move them to the trash folder.
+   */
+  public static void cleanStaleSegmentsNonPartitionTable(CarbonTable carbonTable)
+     throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    String timeStampForTrashFolder = String.valueOf(new Timestamp(System.currentTimeMillis())
+        .getTime());
+    if (FileFactory.isFileExist(partitionPath)) {
+      // list all segments before reading tablestatus file.
+      CarbonFile[] allSegments = FileFactory.getCarbonFile(partitionPath).listFiles();
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) {
+        return;
+      }
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          // Do not stop if the tablestatus file does not exist, it can have stale segments.
+          if (FileFactory.getCarbonFile(carbonTable.getMetadataPath() + CarbonCommonConstants
+              .FILE_SEPARATOR + CarbonTablePath.TABLE_STATUS_FILE).exists() && (details == null
+              || details.length == 0)) {
+            return;
+          }
+          HashMap<CarbonFile, String> staleSegments = getStaleSegmentsNormalTable(details,
+              allSegments);
+          // move these segments one by one to the trash folder
+          for (Map.Entry<CarbonFile, String> entry : staleSegments.entrySet()) {
+            TrashUtil.copySegmentToTrash(entry.getKey(), carbonTable.getTablePath(),
+                timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR +
+                CarbonCommonConstants.LOAD_FOLDER + entry.getValue());
+            LOGGER.info("Deleting Segment: " + entry.getKey().getAbsolutePath());
+            try {
+              CarbonUtil.deleteFoldersAndFiles(entry.getKey());
+              LOGGER.info("Deleting Segment: "  + entry.getKey());
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegments.containsValue(file.getName()
+                .split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for cleaning stale segments for table "
+              + carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+          LOGGER.error(errorMessage);
+        }
+      } finally {
+        carbonTableStatusLock.unlock();
+      }
+    }
+  }
+
+  /**
+   *
+   * This method will copy data files for partition table to the trash folder, file by file.
+   */
+  public static List<String> copyPartitionTableFilesToTrash(String segmentFile,
+      String timeStampForTrashFolder, String segmentNo, CarbonTable carbonTable,
+      String indexMergeFile) throws IOException {
+    // search for indexMergeFile everywhere and segmentFile in the carbondata file and
+    // copy to trash folder
+    List<String> filesToDelete = new ArrayList<>();
+    for (CarbonFile fileList : filesInPartitionFolder) {
+      String nameofFile = fileList.getName().replace(CarbonCommonConstants.HYPHEN,
+          CarbonCommonConstants.UNDERSCORE);
+      String nameOfSegment = segmentFile.substring(0, segmentFile.indexOf(
+          CarbonCommonConstants.POINT));
+      String partitionValue = fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR)[fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR).length - 2];
+      if (fileList.getName().equals(indexMergeFile)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonTablePath.INDEX_FILE_EXT)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonCommonConstants.FACT_FILE_EXT)) {
+        // carbondata file found
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(), fileList
+            .getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      }
+    }
+    return filesToDelete;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments in normal table flow.
+   */
+  public static HashMap<CarbonFile, String> getStaleSegmentsNormalTable(LoadMetadataDetails[]
+      details, CarbonFile[] allSegments) {
+    Set<String> metadataSet = new HashSet<>(details.length);
+    for (LoadMetadataDetails detail : details) {
+      metadataSet.add(detail.getLoadName());
+    }
+    HashMap<CarbonFile, String> staleSegments = new HashMap<>(allSegments.length);
+    for (CarbonFile segment : allSegments) {
+      String segmentName = segment.getName();
+      // check segment folder pattern
+      if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+        String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+        if (parts.length == 2) {
+          boolean isOriginal = !parts[1].contains(".");
+          // in loading flow,
+          // it should be original segment and segment metadata doesn't exists
+          if (isOriginal && !metadataSet.contains(parts[1])) {
+            staleSegments.put(segment, parts[1]);
+          }
+        }
+      }
+    }
+    return staleSegments;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments for partition table flow.
+   */
+  public static List<String> getStaleSegmentsPartitionTable(LoadMetadataDetails[] details,
+      Map<String, String> detailName, CarbonFile[] allSegment) {
+    List<String> staleSegments = new ArrayList<>();

Review comment:
       add a capacity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r527484205



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+  private static List<CarbonFile> filesInPartitionFolder = new ArrayList<CarbonFile>();
+
+  /**
+   *
+   * This method will clean the stale segments and move them to the trash folder.
+   */
+  public static void cleanStaleSegmentsNonPartitionTable(CarbonTable carbonTable)
+     throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    String timeStampForTrashFolder = String.valueOf(new Timestamp(System.currentTimeMillis())
+        .getTime());
+    if (FileFactory.isFileExist(partitionPath)) {
+      // list all segments before reading tablestatus file.
+      CarbonFile[] allSegments = FileFactory.getCarbonFile(partitionPath).listFiles();
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) {
+        return;
+      }
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          // Do not stop if the tablestatus file does not exist, it can have stale segments.
+          if (FileFactory.getCarbonFile(carbonTable.getMetadataPath() + CarbonCommonConstants
+              .FILE_SEPARATOR + CarbonTablePath.TABLE_STATUS_FILE).exists() && (details == null
+              || details.length == 0)) {
+            return;
+          }
+          HashMap<CarbonFile, String> staleSegments = getStaleSegmentsNormalTable(details,
+              allSegments);
+          // move these segments one by one to the trash folder
+          for (Map.Entry<CarbonFile, String> entry : staleSegments.entrySet()) {
+            TrashUtil.copySegmentToTrash(entry.getKey(), carbonTable.getTablePath(),
+                timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR +
+                CarbonCommonConstants.LOAD_FOLDER + entry.getValue());
+            LOGGER.info("Deleting Segment: " + entry.getKey().getAbsolutePath());
+            try {
+              CarbonUtil.deleteFoldersAndFiles(entry.getKey());
+              LOGGER.info("Deleting Segment: "  + entry.getKey());
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegments.containsValue(file.getName()
+                .split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for cleaning stale segments for table "
+              + carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+          LOGGER.error(errorMessage);
+        }
+      } finally {
+        carbonTableStatusLock.unlock();
+      }
+    }
+  }
+
+  /**
+   *
+   * This method will copy data files for partition table to the trash folder, file by file.
+   */
+  public static List<String> copyPartitionTableFilesToTrash(String segmentFile,
+      String timeStampForTrashFolder, String segmentNo, CarbonTable carbonTable,
+      String indexMergeFile) throws IOException {
+    // search for indexMergeFile everywhere and segmentFile in the carbondata file and
+    // copy to trash folder
+    List<String> filesToDelete = new ArrayList<>();
+    for (CarbonFile fileList : filesInPartitionFolder) {
+      String nameofFile = fileList.getName().replace(CarbonCommonConstants.HYPHEN,
+          CarbonCommonConstants.UNDERSCORE);
+      String nameOfSegment = segmentFile.substring(0, segmentFile.indexOf(
+          CarbonCommonConstants.POINT));
+      String partitionValue = fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR)[fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR).length - 2];
+      if (fileList.getName().equals(indexMergeFile)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonTablePath.INDEX_FILE_EXT)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonCommonConstants.FACT_FILE_EXT)) {
+        // carbondata file found
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(), fileList
+            .getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      }
+    }
+    return filesToDelete;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments in normal table flow.
+   */
+  public static HashMap<CarbonFile, String> getStaleSegmentsNormalTable(LoadMetadataDetails[]
+      details, CarbonFile[] allSegments) {
+    Set<String> metadataSet = new HashSet<>(details.length);
+    for (LoadMetadataDetails detail : details) {
+      metadataSet.add(detail.getLoadName());
+    }
+    HashMap<CarbonFile, String> staleSegments = new HashMap<>(allSegments.length);
+    for (CarbonFile segment : allSegments) {
+      String segmentName = segment.getName();
+      // check segment folder pattern
+      if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+        String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+        if (parts.length == 2) {
+          boolean isOriginal = !parts[1].contains(".");
+          // in loading flow,
+          // it should be original segment and segment metadata doesn't exists
+          if (isOriginal && !metadataSet.contains(parts[1])) {
+            staleSegments.put(segment, parts[1]);
+          }
+        }
+      }
+    }
+    return staleSegments;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments for partition table flow.
+   */
+  public static List<String> getStaleSegmentsPartitionTable(LoadMetadataDetails[] details,
+      Map<String, String> detailName, CarbonFile[] allSegment) {
+    List<String> staleSegments = new ArrayList<>();
+    for (CarbonFile file : allSegment) {

Review comment:
       worng way. add only whatever is stale




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r527484932



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+  private static List<CarbonFile> filesInPartitionFolder = new ArrayList<CarbonFile>();
+
+  /**
+   *
+   * This method will clean the stale segments and move them to the trash folder.
+   */
+  public static void cleanStaleSegmentsNonPartitionTable(CarbonTable carbonTable)
+     throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    String timeStampForTrashFolder = String.valueOf(new Timestamp(System.currentTimeMillis())
+        .getTime());
+    if (FileFactory.isFileExist(partitionPath)) {
+      // list all segments before reading tablestatus file.
+      CarbonFile[] allSegments = FileFactory.getCarbonFile(partitionPath).listFiles();
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) {
+        return;
+      }
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+          // there is no segment or failed to read tablestatus file.
+          // so it should stop immediately.
+          // Do not stop if the tablestatus file does not exist, it can have stale segments.
+          if (FileFactory.getCarbonFile(carbonTable.getMetadataPath() + CarbonCommonConstants
+              .FILE_SEPARATOR + CarbonTablePath.TABLE_STATUS_FILE).exists() && (details == null
+              || details.length == 0)) {
+            return;
+          }
+          HashMap<CarbonFile, String> staleSegments = getStaleSegmentsNormalTable(details,
+              allSegments);
+          // move these segments one by one to the trash folder
+          for (Map.Entry<CarbonFile, String> entry : staleSegments.entrySet()) {
+            TrashUtil.copySegmentToTrash(entry.getKey(), carbonTable.getTablePath(),
+                timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR +
+                CarbonCommonConstants.LOAD_FOLDER + entry.getValue());
+            LOGGER.info("Deleting Segment: " + entry.getKey().getAbsolutePath());
+            try {
+              CarbonUtil.deleteFoldersAndFiles(entry.getKey());
+              LOGGER.info("Deleting Segment: "  + entry.getKey());
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
+            }
+          }
+          if (staleSegments.size() > 0) {
+            // get the segment metadata path
+            String segmentFilesLocation =
+                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+            // delete the segment metadata files also
+            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
+                .listFiles(file -> (staleSegments.containsValue(file.getName()
+                .split(CarbonCommonConstants.UNDERSCORE)[0])));
+            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
+              staleSegmentMetadataFile.delete();
+            }
+          }
+        } else {
+          String errorMessage =
+              "Not able to acquire the Table status lock for cleaning stale segments for table "
+              + carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+          LOGGER.error(errorMessage);
+        }
+      } finally {
+        carbonTableStatusLock.unlock();
+      }
+    }
+  }
+
+  /**
+   *
+   * This method will copy data files for partition table to the trash folder, file by file.
+   */
+  public static List<String> copyPartitionTableFilesToTrash(String segmentFile,
+      String timeStampForTrashFolder, String segmentNo, CarbonTable carbonTable,
+      String indexMergeFile) throws IOException {
+    // search for indexMergeFile everywhere and segmentFile in the carbondata file and
+    // copy to trash folder
+    List<String> filesToDelete = new ArrayList<>();
+    for (CarbonFile fileList : filesInPartitionFolder) {
+      String nameofFile = fileList.getName().replace(CarbonCommonConstants.HYPHEN,
+          CarbonCommonConstants.UNDERSCORE);
+      String nameOfSegment = segmentFile.substring(0, segmentFile.indexOf(
+          CarbonCommonConstants.POINT));
+      String partitionValue = fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR)[fileList.getAbsolutePath().split(CarbonCommonConstants
+          .FILE_SEPARATOR).length - 2];
+      if (fileList.getName().equals(indexMergeFile)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonTablePath.INDEX_FILE_EXT)) {
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(),
+            fileList.getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      } else if (nameofFile.contains(nameOfSegment) && fileList.getName().endsWith(
+          CarbonCommonConstants.FACT_FILE_EXT)) {
+        // carbondata file found
+        TrashUtil.copyFileToTrashFolder(carbonTable.getTablePath(), fileList
+            .getAbsolutePath(), timeStampForTrashFolder + CarbonCommonConstants
+            .FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segmentNo +
+            CarbonCommonConstants.FILE_SEPARATOR + partitionValue);
+        filesToDelete.add(fileList.getAbsolutePath());
+      }
+    }
+    return filesToDelete;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments in normal table flow.
+   */
+  public static HashMap<CarbonFile, String> getStaleSegmentsNormalTable(LoadMetadataDetails[]
+      details, CarbonFile[] allSegments) {
+    Set<String> metadataSet = new HashSet<>(details.length);
+    for (LoadMetadataDetails detail : details) {
+      metadataSet.add(detail.getLoadName());
+    }
+    HashMap<CarbonFile, String> staleSegments = new HashMap<>(allSegments.length);
+    for (CarbonFile segment : allSegments) {
+      String segmentName = segment.getName();
+      // check segment folder pattern
+      if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
+        String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
+        if (parts.length == 2) {
+          boolean isOriginal = !parts[1].contains(".");
+          // in loading flow,
+          // it should be original segment and segment metadata doesn't exists
+          if (isOriginal && !metadataSet.contains(parts[1])) {
+            staleSegments.put(segment, parts[1]);
+          }
+        }
+      }
+    }
+    return staleSegments;
+  }
+
+  /**
+   *
+   * This method will find all the stale segments for partition table flow.
+   */
+  public static List<String> getStaleSegmentsPartitionTable(LoadMetadataDetails[] details,
+      Map<String, String> detailName, CarbonFile[] allSegment) {
+    List<String> staleSegments = new ArrayList<>();
+    for (CarbonFile file : allSegment) {
+      staleSegments.add(file.getName());
+    }
+    Iterator detailNameIterator = detailName.entrySet().iterator();
+    while (detailNameIterator.hasNext()) {
+      Map.Entry mapElement = (Map.Entry) detailNameIterator.next();
+      staleSegments.remove(mapElement.getValue());
+    }

Review comment:
       find stale segment first and just add them, no need to remove




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731178128


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3075/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731178680


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4831/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731236951


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4832/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731242780


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3076/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731345038


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4836/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731350208


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3081/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731772007


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4847/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731772163


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3094/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731991505


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4852/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-731991828


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3099/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-732789701


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4877/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-732790935


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3124/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-732986999


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4888/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-732989642


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3134/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r530787523



##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1414,6 +1414,23 @@ private CarbonCommonConstants() {
 
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
+  /**
+   * this is the user defined time(in days), timestamp subfolders in trash directory will take
+   * this value as retention time. They are deleted after this time.
+   */
+  @CarbonProperty
+  public static final String CARBON_TRASH_RETENTION_DAYS = "carbon.trash.retention.days";
+
+  /**
+   * Default retention time of a subdirectory in trash folder is 7 days.
+   */
+  public static final String CARBON_TRASH_RETENTION_DAYS_DEFAULT = "7";

Review comment:
       Default and max values keep as int only to avoid parsing agian

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param fromPath the path from which to copy the file
+   * @param toPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String fromPath, String toPath) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(toPath);
+      dataInputStream = FileFactory.getDataInputStream(fromPath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + fromPath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      List<CarbonFile> dataFiles = FileFactory.getFolderList(segmentPath.getAbsolutePath());
+      for (CarbonFile carbonFile : dataFiles) {
+        copyFileToTrashFolder(carbonFile.getAbsolutePath(), trashFolderWithTimestamp);
+      }
+      LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been copied to" +
+          " the trash folder successfully");
+    } catch (IOException e) {
+      LOGGER.error("Error while getting folder list for the segment", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined retention time
+   */
+  public static void deleteExpiredDataFromTrash(String tablePath) {
+    long retentionMilliSeconds = CarbonProperties.getInstance().getTrashFolderRetentionTime();
+    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    try {
+      if (FileFactory.isFileExist(trashPath)) {
+        List<CarbonFile> timestampFolderList = FileFactory.getFolderList(trashPath);
+        long currentTime = System.currentTimeMillis();
+        for (CarbonFile timestampFolder : timestampFolderList) {
+          long trashFolderTimestampSubFolder = Long.parseLong(timestampFolder.getName());
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (trashFolderTimestampSubFolder < currentTime - retentionMilliSeconds) {

Review comment:
       current - trashTime > retention time is more readable

##########
File path: docs/clean-files.md
##########
@@ -0,0 +1,56 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all stale(segments whose entry is not in tablestatus file) carbondata segments are moved to during clean files operation.
+  This trash folder is mantained inside the table path and is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp
+  subfolder(each clean files operation is represented by a timestamp). This helps the user to list down segments in the trash folder by timestamp.  By default all the timestamp sub-directory have an expiration
+  time of 7 days(since the timestamp it was created) and it can be configured by the user using the following carbon property. The supported values are between 0 and 365(both included.)
+   ```
+   carbon.trash.retention.days = "Number of days"
+   ```
+  Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command.
+
+### FORCE DELETE TRASH
+The force option with clean files command deletes all the files and folders from the trash folder.
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
+  ```
+
+### DATA RECOVERY FROM THE TRASH FOLDER
+
+The segments can be recovered from the trash folder by creating table from the desired segment location

Review comment:
       mention as external table

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for table given table. In this method, we first
+   * get the stale segments(segments whose entry is not in the table status, but are present in
+   * the metadata folder) or in case when table status is deleted. To identify the stale segments
+   * we compare the segment files in the metadata folder with table status file, if it exists. The
+   * identified stale segments are then copied to the trash folder and then their .segment files
+   * are also deleted from the metadata folder. We only compare with tablestatus file here, not
+   * with tablestatus history file.
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    List<String> staleSegments = getStaleSegments(details, segmentFilesList);

Review comment:
       just get the list of segment data folder directory for all the stale segment and move segment by segment, not file by file. whole directory you can copy by calling recursive copy.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param fromPath the path from which to copy the file
+   * @param toPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String fromPath, String toPath) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(toPath);
+      dataInputStream = FileFactory.getDataInputStream(fromPath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + fromPath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,

Review comment:
       right now unused ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -54,6 +52,12 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
         val indexTables = CarbonIndexUtil
           .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
         indexTables.foreach { indexTable =>
+          if (cleanFilesPostEvent.force) {
+            TrashUtil.emptyTrash(indexTable.getTablePath)
+          } else {
+            TrashUtil.deleteExpiredDataFromTrash(indexTable.getTablePath)
+          }
+          CleanFilesUtil.cleanStaleSegments(indexTable)

Review comment:
       same as above doubt

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash

Review comment:
       can combine with above testcase I guess

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0

Review comment:
       no need of this ?

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createParitionTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+  test("test trash folder with 2 segments with same segment number") {
+    createParitionTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()

Review comment:
       duplicate testcase, handle only partition specific testcase here

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash

Review comment:
       also below one

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR

Review comment:
       USE existing API, like tablePath.getTrashDir

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for table given table. In this method, we first
+   * get the stale segments(segments whose entry is not in the table status, but are present in
+   * the metadata folder) or in case when table status is deleted. To identify the stale segments
+   * we compare the segment files in the metadata folder with table status file, if it exists. The
+   * identified stale segments are then copied to the trash folder and then their .segment files
+   * are also deleted from the metadata folder. We only compare with tablestatus file here, not
+   * with tablestatus history file.
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    List<String> staleSegments = getStaleSegments(details, segmentFilesList);
+
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // Delete the segment file too
+        filesToDelete.add(FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilePath(carbonTable
+            .getTablePath(), staleSegment)));
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            if (file.isFileExist()) {

Review comment:
       stale segments are identified based on segment files only, no need of segment exist check here

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for table given table. In this method, we first
+   * get the stale segments(segments whose entry is not in the table status, but are present in
+   * the metadata folder) or in case when table status is deleted. To identify the stale segments
+   * we compare the segment files in the metadata folder with table status file, if it exists. The
+   * identified stale segments are then copied to the trash folder and then their .segment files
+   * are also deleted from the metadata folder. We only compare with tablestatus file here, not
+   * with tablestatus history file.
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    List<String> staleSegments = getStaleSegments(details, segmentFilesList);
+
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // Delete the segment file too
+        filesToDelete.add(FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilePath(carbonTable
+            .getTablePath(), staleSegment)));
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            if (file.isFileExist()) {
+              FileFactory.deleteFile(file.getAbsolutePath());
+              // deleting empty segment folder in case of normal table and partition folders
+              // in case of partition table
+              SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file

Review comment:
       Caller itself separate partition and non-partition table. Don't handle both in same. Extract common method and handle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

ajantha-bhat commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r530794465



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -91,6 +96,14 @@ case class CarbonCleanFilesCommand(
     OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
     if (tableName.isDefined) {
       Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+      if (forceClean) {
+        // empty the trash folder
+        TrashUtil.emptyTrash(carbonTable.getTablePath)
+      } else {
+        // clear trash based on timestamp
+        TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+      }
+      CleanFilesUtil.cleanStaleSegments(carbonTable)

Review comment:
       this was already cleaned before moving to trash right ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r530955721



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for table given table. In this method, we first
+   * get the stale segments(segments whose entry is not in the table status, but are present in
+   * the metadata folder) or in case when table status is deleted. To identify the stale segments
+   * we compare the segment files in the metadata folder with table status file, if it exists. The
+   * identified stale segments are then copied to the trash folder and then their .segment files
+   * are also deleted from the metadata folder. We only compare with tablestatus file here, not
+   * with tablestatus history file.
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    List<String> staleSegments = getStaleSegments(details, segmentFilesList);
+
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // Delete the segment file too
+        filesToDelete.add(FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilePath(carbonTable
+            .getTablePath(), staleSegment)));
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            if (file.isFileExist()) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for table given table. In this method, we first
+   * get the stale segments(segments whose entry is not in the table status, but are present in
+   * the metadata folder) or in case when table status is deleted. To identify the stale segments
+   * we compare the segment files in the metadata folder with table status file, if it exists. The
+   * identified stale segments are then copied to the trash folder and then their .segment files
+   * are also deleted from the metadata folder. We only compare with tablestatus file here, not
+   * with tablestatus history file.
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    String metaDataLocation = carbonTable.getMetadataPath();
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    List<String> staleSegments = getStaleSegments(details, segmentFilesList);
+
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // Delete the segment file too
+        filesToDelete.add(FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilePath(carbonTable
+            .getTablePath(), staleSegment)));
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            if (file.isFileExist()) {
+              FileFactory.deleteFile(file.getAbsolutePath());
+              // deleting empty segment folder in case of normal table and partition folders
+              // in case of partition table
+              SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param fromPath the path from which to copy the file
+   * @param toPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String fromPath, String toPath) throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(toPath);
+      dataInputStream = FileFactory.getDataInputStream(fromPath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + fromPath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,

Review comment:
       Is now being used for normal table clean files flow




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


123456 ... 8