[GitHub] [carbondata] vikramahuja1001 opened a new pull request #3917: [WIP] clean files refactor

classic Classic list List threaded Threaded
184 messages Options
1 ... 45678910
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox

CarbonDataQA1 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-717883448


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2960/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513883725



##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -192,11 +208,17 @@ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
   }
 
   private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
+      boolean isForceDelete, AbsoluteTableIdentifier absoluteTableIdentifier) {
     // Check if the segment is added externally and path is set then do not delete it
     if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
-        || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null
+        || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
+        .INSERT_IN_PROGRESS == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null

Review comment:
       about clean files, because cleaning different load status have different risk level,
   so in my opinion, different load status should have different expiration time for keeping it at the original place.
   
   1. MARKED_FOR_DELETE and COMPACTED => MaxQueryTimeout is enough
   
   2. INSERT_IN_PROGRESS and INSERT_OVERWRITE_IN_PROGRESS => more than data processing time(maybe 3 days)
     if loading or compaction takes a long time which is longer than LockingTimeout and MaxQueryTimeout, clean in_progress will have a big risk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513884891



##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1105,28 +1109,79 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    * @throws IOException
    */
   public static void deleteSegment(String tablePath, Segment segment,
-      List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      List<PartitionSpec> partitionSpecs, SegmentUpdateStatusManager updateStatusManager,
+      SegmentStatus segmentStatus, Boolean isPartitionTable, String timeStamp)
+      throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
     List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
         FileFactory.getConfiguration());
+    List<String> filesToDelete = new ArrayList<>();
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
-      FileFactory.deleteFile(entry.getKey());
+      // Move the file to the trash folder in case the segment status is insert in progress
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.moveDataToTrashFolderByFile(tablePath, entry.getKey(), timeStamp +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + segment
+              .getSegmentNo());
+        } else {
+          TrashUtil.moveDataToTrashFolderByFile(tablePath, entry.getKey(), timeStamp +

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513889155



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {

Review comment:
       if we do not use it, we can remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513889604



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)

Review comment:
       truncateTable is a function, maybe we need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513889893



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       is there a conflict with update/delete/merge?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513891217



##########
File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
##########
@@ -1136,7 +1137,8 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
           if (updateCompletionStatus) {
             DeleteLoadFolders
                 .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
-                    isForceDeletion, partitionSpecs);
+                    isForceDeletion, partitionSpecs, String.valueOf(new Timestamp(System

Review comment:
       if this method is used by clean files only, can we move it to CleanFilesUtil?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514271886



##########
File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
##########
@@ -1136,7 +1137,8 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
           if (updateCompletionStatus) {
             DeleteLoadFolders
                 .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
-                    isForceDeletion, partitionSpecs);
+                    isForceDeletion, partitionSpecs, String.valueOf(new Timestamp(System

Review comment:
       yes, moved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514429233



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       no, we are copying the complete segment to trash, so no issues with delta files. I added a test case too with delete delta




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] brijoobopanna commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

brijoobopanna commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514432464



##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all the unnecessary files and folders are moved to during clean files operation.

Review comment:
       Can we please rewrite this part instead of telling unnecessary files and folders
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] brijoobopanna commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

brijoobopanna commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514434545



##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all the unnecessary files and folders are moved to during clean files operation.
+  This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp
+  subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp.  By default all the timestamp sub-directory have an expiration
+  time of (3 days since that timestamp) and it can be configured by the user using the following carbon property
+   ```
+   carbon.trash.expiration.time = "Number of days"
+   ```
+  Once the timestamp subdirectory is expired as per the configured expiration day value, the subdirectory is deleted from the trash folder in the subsequent clean files command.
+  
+
+
+
+### DRY RUN
+  Support for dry run is provided before the actual clean files operation. This dry run operation will list down all the segments which are going to be manipulated during
+  the clean files operation. The dry run result will show the current location of the segment(it can be in FACT folder, Partition folder or trash folder) and where that segment

Review comment:
       what about  : show time threshold based segments in trash folder




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] brijoobopanna commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

brijoobopanna commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514436587



##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all the unnecessary files and folders are moved to during clean files operation.
+  This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp
+  subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp.  By default all the timestamp sub-directory have an expiration
+  time of (3 days since that timestamp) and it can be configured by the user using the following carbon property
+   ```
+   carbon.trash.expiration.time = "Number of days"
+   ```
+  Once the timestamp subdirectory is expired as per the configured expiration day value, the subdirectory is deleted from the trash folder in the subsequent clean files command.
+  
+
+
+
+### DRY RUN
+  Support for dry run is provided before the actual clean files operation. This dry run operation will list down all the segments which are going to be manipulated during
+  the clean files operation. The dry run result will show the current location of the segment(it can be in FACT folder, Partition folder or trash folder) and where that segment
+  will be moved(to the trash folder or deleted from store) once the actual operation will be called.
+  
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('dry_run'='true')
+  ```
+
+### FORCE DELETE TRASH
+The force option with clean files command deletes all the files and folders from the trash folder.
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
+  ```
+
+### DATA RECOVERY FROM THE TRASH FOLDER
+
+The segments can be recovered from the trash folder by creating an external table from the desired segment location
+in the trash folder and inserting into the original table from the external table. It will create a new segment in the original table.

Review comment:
       It would be good if also explain the behavior on SI or MV based on these how the behavior will be




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-718948233


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4733/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-718951982


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2974/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514677476



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       but I don't find any lock for update/delete when it tries to cleanUpDeltaFiles




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514677476



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       but I don't find any lock for update/delete when it tries to cleanUpDeltaFiles in clean files flow




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r514677476



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       but I don't find any lock for update/delete when it tries to cleanUpDeltaFiles in clean files flow. When concurrent clean files and update happened, this method of clean files will remove the files which generated by the concurrent update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-720367406


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-720948859


   about dry_run part,  this pr write new code to implement it. better to reuse code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r515825251



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)

Review comment:
       @ajantha-bhat now we need to remove the stale delta file immediately before the next update/delete.
   
   
   @vikramahuja1001 if we don't lock update/delete, better to remove the clean file code for update/delete.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2116,6 +2087,28 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the microseconds after which the trash folder will expire
+   */
+  public long getTrashFolderExpirationTime() {
+    String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS,
+            CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+    Integer result = 0;
+    try {
+      result = Integer.parseInt(configuredValue);
+      if (result < 0) {
+        LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value");
+        result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Invalid value configured for CarbonCommonConstants" +
+          ".CARBON_TRASH_EXPIRATION_DAYS, considering the default value");

Review comment:
       LOGGER.error("Invalid value configured for " **+ CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS +** ", considering the default value");

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1427,6 +1427,23 @@ private CarbonCommonConstants() {
 
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
+  /**
+   * this is the user defined time(in days), when a specific timestamp subdirectory in
+   * trash folder will expire
+   */
+  @CarbonProperty
+  public static final String CARBON_TRASH_EXPIRATION_DAYS = "carbon.trash.expiration.days";
+
+  /**
+   * Default expiration time of trash folder is 3 days.
+   */
+  public static final String CARBON_TRASH_EXPIRATION_DAYS_DEFAULT = "3";

Review comment:
       how about 7 days after we remove in progress

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2116,6 +2087,28 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the microseconds after which the trash folder will expire
+   */
+  public long getTrashFolderExpirationTime() {
+    String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS,
+            CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+    Integer result = 0;
+    try {
+      result = Integer.parseInt(configuredValue);
+      if (result < 0) {
+        LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value");
+        result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Invalid value configured for CarbonCommonConstants" +
+          ".CARBON_TRASH_EXPIRATION_DAYS, considering the default value");
+      result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+    }
+    Long microSecondsInADay = Long.valueOf(TimeUnit.DAYS.toMillis(1));
+    return result * microSecondsInADay;

Review comment:
       long microSecondsInADay = TimeUnit.DAYS.toMillis(1);
   return microSecondsInADay * result;
   
   need put long variable in front

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3441,4 +3443,33 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  /**
+   * The below method tries to get the segment lock for the given segment.
+   */
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted;
+    try {
+      if (segmentLock.lockWithRetries(CarbonCommonConstants
+          .NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT, CarbonCommonConstants
+          .MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT)) {

Review comment:
       if it uses the default parameters,  better to invoke the default method without parameters:
   segmentLock.lockWithRetries()

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3441,4 +3443,33 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  /**
+   * The below method tries to get the segment lock for the given segment.
+   */
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted;
+    try {
+      if (segmentLock.lockWithRetries(CarbonCommonConstants
+          .NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT, CarbonCommonConstants
+          .MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT)) {
+        LOGGER.info("Info: Acquired segment lock on segment: " + oneLoad.getLoadName() + ". It " +
+                "can be deleted as load is not in progress");
+        canBeDeleted = true;
+      } else {
+        LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
+        canBeDeleted = false;
+      }
+    } finally {
+      if (segmentLock.unlock()) {
+        LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
+      } else {
+        LOGGER.error("Error: Unable to release segment lock on : " + oneLoad.getLoadName());

Review comment:
       no needed to add "Info" and "Error" into log message, log4j will do it.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3441,4 +3443,33 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  /**
+   * The below method tries to get the segment lock for the given segment.
+   */
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted;

Review comment:
       why it have canBeDeleted in tryGettingSegmentLock?
   please remove canBeDeleted and change method name to canGetSegmentLock
   

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete,
+      List<PartitionSpec> specs) {
+    LoadMetadataDetails[] currentDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    physicalFactAndMeasureMetadataDeletion(carbonTable,
+        currentDetails,
+        isForceDelete,
+        specs,
+        currentDetails);
+    if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
+      physicalFactAndMeasureMetadataDeletion(carbonTable,
+          newAddedLoadHistoryList,
+          isForceDelete,
+          specs,
+          currentDetails);
+    }
+  }
+
+  /**
+   * Delete the invalid data physically from table.
+   * @param carbonTable table
+   * @param loadDetails Load details which need clean up
+   * @param isForceDelete is Force delete requested by user
+   * @param specs Partition specs
+   * @param currLoadDetails Current table status load details which are required for update manager.
+   */
+  private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+      LoadMetadataDetails[] currLoadDetails) {
+    List<TableIndex> indexes = new ArrayList<>();
+    try {
+      for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
+        if (index.getIndexSchema().isIndex()) {
+          indexes.add(index);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn(String.format(
+          "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.",
+          carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+          carbonTable.getAbsoluteTableIdentifier().getTableName()));
+    }
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
+    for (final LoadMetadataDetails oneLoad : loadDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        try {
+          if (oneLoad.getSegmentFile() != null) {
+            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
+                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs,
+                updateStatusManager);
+          } else {
+            String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
+            boolean status = false;
+            if (FileFactory.isFileExist(path)) {
+              CarbonFile file = FileFactory.getCarbonFile(path);
+              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+                @Override
+                public boolean accept(CarbonFile file) {
+                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+                    CarbonTablePath.isCarbonIndexFile(file.getName()));
+                }
+              });
+
+              //if there are no fact and msr metadata files present then no need to keep
+              //entry in metadata.
+              if (filesToBeDeleted.length == 0) {
+                status = true;
+              } else {
+                for (CarbonFile eachFile : filesToBeDeleted) {
+                  if (!eachFile.delete()) {
+                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                        .getAbsolutePath());
+                    status = false;
+                  } else {
+                    status = true;
+                  }
+                }
+              }
+              // need to delete the complete folder.
+              if (status) {
+                if (!file.delete()) {
+                  LOGGER.warn("Unable to delete the folder as per delete command " + file
+                      .getAbsolutePath());
+                }
+              }
+
+            }
+          }
+          List<Segment> segments = new ArrayList<>(1);
+          for (TableIndex index : indexes) {
+            segments.clear();
+            segments.add(new Segment(oneLoad.getLoadName()));
+            index.deleteIndexData(segments);
+          }
+        } catch (Exception e) {
+          LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
+        }
+      }
+    }
+  }
+
+  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
+                                                           boolean isForceDelete) {
+    // Check if the segment is added externally and path is set then do not delete it
+    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
+        || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null
+        || oneLoad.getPath().equalsIgnoreCase("NA"))) {
+      if (isForceDelete) {
+        return true;
+      }
+      long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+    }
+    return false;
+  }
+
+
+  /**
+   * returns segment path
+   *
+   * @param identifier
+   * @param oneLoad
+   * @return
+   */
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
+      LoadMetadataDetails oneLoad) {
+    String segmentId = oneLoad.getLoadName();
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+  }

Review comment:
       no needed this method:
   CarbonTablePath.getSegmentPath(identifier.getTablePath(), oneLoad.getLoadName());

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,

Review comment:
       change method name to physicalDataAndIndexDeletion

##########
File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1427,6 +1427,23 @@ private CarbonCommonConstants() {
 
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
+  /**
+   * this is the user defined time(in days), when a specific timestamp subdirectory in
+   * trash folder will expire
+   */
+  @CarbonProperty
+  public static final String CARBON_TRASH_EXPIRATION_DAYS = "carbon.trash.expiration.days";
+
+  /**
+   * Default expiration time of trash folder is 3 days.
+   */
+  public static final String CARBON_TRASH_EXPIRATION_DAYS_DEFAULT = "3";
+
+  /**
+   * Trash folder temp name
+   */
+  public static final String CARBON_TRASH_FOLDER_NAME = ".Trash";

Review comment:
       move it into CarbonTablePath

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete,
+      List<PartitionSpec> specs) {
+    LoadMetadataDetails[] currentDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    physicalFactAndMeasureMetadataDeletion(carbonTable,
+        currentDetails,
+        isForceDelete,
+        specs,
+        currentDetails);
+    if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
+      physicalFactAndMeasureMetadataDeletion(carbonTable,
+          newAddedLoadHistoryList,
+          isForceDelete,
+          specs,
+          currentDetails);
+    }
+  }
+
+  /**
+   * Delete the invalid data physically from table.
+   * @param carbonTable table
+   * @param loadDetails Load details which need clean up
+   * @param isForceDelete is Force delete requested by user
+   * @param specs Partition specs
+   * @param currLoadDetails Current table status load details which are required for update manager.
+   */
+  private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+      LoadMetadataDetails[] currLoadDetails) {
+    List<TableIndex> indexes = new ArrayList<>();
+    try {
+      for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
+        if (index.getIndexSchema().isIndex()) {
+          indexes.add(index);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn(String.format(
+          "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.",
+          carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+          carbonTable.getAbsoluteTableIdentifier().getTableName()));
+    }
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
+    for (final LoadMetadataDetails oneLoad : loadDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        try {
+          if (oneLoad.getSegmentFile() != null) {
+            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
+                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs,
+                updateStatusManager);
+          } else {
+            String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
+            boolean status = false;
+            if (FileFactory.isFileExist(path)) {
+              CarbonFile file = FileFactory.getCarbonFile(path);
+              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+                @Override
+                public boolean accept(CarbonFile file) {
+                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+                    CarbonTablePath.isCarbonIndexFile(file.getName()));
+                }
+              });
+
+              //if there are no fact and msr metadata files present then no need to keep
+              //entry in metadata.
+              if (filesToBeDeleted.length == 0) {
+                status = true;
+              } else {
+                for (CarbonFile eachFile : filesToBeDeleted) {
+                  if (!eachFile.delete()) {
+                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                        .getAbsolutePath());
+                    status = false;
+                  } else {
+                    status = true;
+                  }
+                }
+              }
+              // need to delete the complete folder.
+              if (status) {
+                if (!file.delete()) {
+                  LOGGER.warn("Unable to delete the folder as per delete command " + file
+                      .getAbsolutePath());
+                }
+              }
+
+            }
+          }
+          List<Segment> segments = new ArrayList<>(1);
+          for (TableIndex index : indexes) {
+            segments.clear();

Review comment:
       move to  after line 148

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }

Review comment:
       1. change File to CarbonFile
   2. need support HDFS and cloud object storage
   reuse the following API to finish it.
       FileFactory.getDataOutputStream
       FileFactory.getDataInputStream
       IOUtils.copyBytes

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));

Review comment:
       need support Local, HDFS and cloud object storage

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants

Review comment:
       better to check carbonFile.getName() directly, avoid IO operation
   

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)

Review comment:
       long timeStamp, avoid to unbox

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants
+            .FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          } else {
+            LOGGER.info("Timestamp folder not expired: " + carbonFile.getAbsolutePath());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // if the trash folder exists delete the contents of the trash folder
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          deleteDataFromTrashFolderByFile(carbonFile);
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes by carbonFiles, if it is a directory, it will delete recursively
+   */
+  private static void deleteDataFromTrashFolderByFile(CarbonFile carbonFile) {

Review comment:
       actually, it always uses this method to delete a timestamp folder.
   better to improve the method name.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants
+            .FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          } else {
+            LOGGER.info("Timestamp folder not expired: " + carbonFile.getAbsolutePath());

Review comment:
       avoid using carbonFile.getAbsolutePath():
       pathOfTrashFolder + "/" + carbonFile.getName

##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all stale carbondata segments are moved to during clean files operation.
+  This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp
+  subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp.  By default all the timestamp sub-directory have an expiration
+  time of (3 days since that timestamp) and it can be configured by the user using the following carbon property

Review comment:
       chagne to 7 days

##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)

Review comment:
       Partial => partial

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {

Review comment:
       change the class name, Clean what?

##########
File path: docs/cleanfiles.md
##########
@@ -0,0 +1,78 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In Progress which are stale and Partial(Segments which are missing from the table status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where all stale carbondata segments are moved to during clean files operation.
+  This trash folder is mantained inside the table path. It is a hidden folder(.Trash). The segments that are moved to the trash folder are mantained under a timestamp
+  subfolder(timestamp at which clean files operation is called). This helps the user to list down segments by timestamp.  By default all the timestamp sub-directory have an expiration
+  time of (3 days since that timestamp) and it can be configured by the user using the following carbon property
+   ```
+   carbon.trash.expiration.time = "Number of days"
+   ```
+  Once the timestamp subdirectory is expired as per the configured expiration day value, the subdirectory is deleted from the trash folder in the subsequent clean files command.
+  
+
+
+
+### DRY RUN
+  Support for dry run is provided before the actual clean files operation. This dry run operation will list down all the segments which are going to be manipulated during
+  the clean files operation. The dry run result will show the current location of the segment(it can be in FACT folder, Partition folder or trash folder), where that segment
+  will be moved(to the trash folder or deleted from store) and the number of days left before it expires once the actual operation will be called.
+  
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('dry_run'='true')
+  ```
+
+### FORCE DELETE TRASH
+The force option with clean files command deletes all the files and folders from the trash folder.

Review comment:
       does it clean segment too? is there some different things?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],

Review comment:
       CleanUtil contain some functional method also

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants
+            .FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          } else {
+            LOGGER.info("Timestamp folder not expired: " + carbonFile.getAbsolutePath());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);

Review comment:
       this message appears 4 times, please different them

##########
File path: docs/dml-of-carbondata.md
##########
@@ -560,5 +561,4 @@ CarbonData DML statements are documented here,which includes:
 
   Clean the segments which are compacted:
   ```
-  CLEAN FILES FOR TABLE carbon_table
-  ```

Review comment:
       why remove the code quote?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants
+            .FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]);

Review comment:
       long currentTime = new Timestamp(System.currentTimeMillis()).getTime();
   long givenTime = Long.parseLong(.......);
   
   move 108 to be outside of the for loop, or add it as a parameter

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete,
+      List<PartitionSpec> specs) {
+    LoadMetadataDetails[] currentDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    physicalFactAndMeasureMetadataDeletion(carbonTable,
+        currentDetails,
+        isForceDelete,
+        specs,
+        currentDetails);
+    if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
+      physicalFactAndMeasureMetadataDeletion(carbonTable,
+          newAddedLoadHistoryList,
+          isForceDelete,
+          specs,
+          currentDetails);
+    }
+  }
+
+  /**
+   * Delete the invalid data physically from table.
+   * @param carbonTable table
+   * @param loadDetails Load details which need clean up
+   * @param isForceDelete is Force delete requested by user
+   * @param specs Partition specs
+   * @param currLoadDetails Current table status load details which are required for update manager.
+   */
+  private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+      LoadMetadataDetails[] currLoadDetails) {
+    List<TableIndex> indexes = new ArrayList<>();
+    try {
+      for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
+        if (index.getIndexSchema().isIndex()) {
+          indexes.add(index);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn(String.format(
+          "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.",
+          carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+          carbonTable.getAbsoluteTableIdentifier().getTableName()));
+    }
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
+    for (final LoadMetadataDetails oneLoad : loadDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        try {
+          if (oneLoad.getSegmentFile() != null) {
+            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
+                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs,
+                updateStatusManager);
+          } else {
+            String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
+            boolean status = false;
+            if (FileFactory.isFileExist(path)) {
+              CarbonFile file = FileFactory.getCarbonFile(path);
+              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+                @Override
+                public boolean accept(CarbonFile file) {
+                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+                    CarbonTablePath.isCarbonIndexFile(file.getName()));
+                }
+              });
+
+              //if there are no fact and msr metadata files present then no need to keep
+              //entry in metadata.
+              if (filesToBeDeleted.length == 0) {
+                status = true;
+              } else {
+                for (CarbonFile eachFile : filesToBeDeleted) {
+                  if (!eachFile.delete()) {
+                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                        .getAbsolutePath());

Review comment:
       avoid using getAbsolutePath:
     path + "/" + eachFile.getName

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param pathOfFileToCopy the files which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   *
+   * @param carbonTablePath table path of the carbon table
+   * @param path the folder which are to be moved to the trash folder
+   * @param suffixToAdd timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] splitPath = carbonFile.getAbsolutePath().split(CarbonCommonConstants
+            .FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(splitPath[splitPath.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          } else {
+            LOGGER.info("Timestamp folder not expired: " + carbonFile.getAbsolutePath());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    // if the trash folder exists delete the contents of the trash folder
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          deleteDataFromTrashFolderByFile(carbonFile);
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes by carbonFiles, if it is a directory, it will delete recursively
+   */
+  private static void deleteDataFromTrashFolderByFile(CarbonFile carbonFile) {
+    try {
+      FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+      LOGGER.info("delete file from trash+ " + carbonFile.getPath());
+    } catch (CarbonFileException e) {
+      LOGGER.error("Error during deleting from trash folder", e);
+    }
+  }
+
+  /**
+   * The below method will list all files in the trash folder
+   */
+  public static List<String> listSegmentsInTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolderPath(carbonTablePath);
+    List<String> segmentsList = new ArrayList<String>();
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> timeStampList = FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : timeStampList) {
+          List<CarbonFile> SegmentList = FileFactory.getFolderList(carbonFile.getAbsolutePath());

Review comment:
       add new method: FileFactory.getFolderList(carbonFile)
   
   change "SegmentList" to segmentList

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.IndexStoreManager;
+import org.apache.carbondata.core.index.Segment;
+import org.apache.carbondata.core.index.TableIndex;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the code used in clean files command, to delete the load folders and move the data
+ * to trash folder
+ */
+public final class CleanUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete,
+      List<PartitionSpec> specs) {
+    LoadMetadataDetails[] currentDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    physicalFactAndMeasureMetadataDeletion(carbonTable,
+        currentDetails,
+        isForceDelete,
+        specs,
+        currentDetails);
+    if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
+      physicalFactAndMeasureMetadataDeletion(carbonTable,
+          newAddedLoadHistoryList,
+          isForceDelete,
+          specs,
+          currentDetails);
+    }
+  }
+
+  /**
+   * Delete the invalid data physically from table.
+   * @param carbonTable table
+   * @param loadDetails Load details which need clean up
+   * @param isForceDelete is Force delete requested by user
+   * @param specs Partition specs
+   * @param currLoadDetails Current table status load details which are required for update manager.
+   */
+  private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+      LoadMetadataDetails[] currLoadDetails) {
+    List<TableIndex> indexes = new ArrayList<>();
+    try {
+      for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
+        if (index.getIndexSchema().isIndex()) {
+          indexes.add(index);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn(String.format(
+          "Failed to get indexes for %s.%s, therefore the index files could not be cleaned.",
+          carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+          carbonTable.getAbsoluteTableIdentifier().getTableName()));
+    }
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
+    for (final LoadMetadataDetails oneLoad : loadDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        try {
+          if (oneLoad.getSegmentFile() != null) {
+            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
+                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs,
+                updateStatusManager);
+          } else {
+            String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
+            boolean status = false;
+            if (FileFactory.isFileExist(path)) {
+              CarbonFile file = FileFactory.getCarbonFile(path);
+              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+                @Override
+                public boolean accept(CarbonFile file) {
+                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+                    CarbonTablePath.isCarbonIndexFile(file.getName()));
+                }
+              });
+
+              //if there are no fact and msr metadata files present then no need to keep
+              //entry in metadata.
+              if (filesToBeDeleted.length == 0) {
+                status = true;
+              } else {
+                for (CarbonFile eachFile : filesToBeDeleted) {
+                  if (!eachFile.delete()) {
+                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                        .getAbsolutePath());
+                    status = false;
+                  } else {
+                    status = true;
+                  }
+                }
+              }
+              // need to delete the complete folder.
+              if (status) {
+                if (!file.delete()) {
+                  LOGGER.warn("Unable to delete the folder as per delete command " + file
+                      .getAbsolutePath());

Review comment:
       use path directly

##########
File path: docs/dml-of-carbondata.md
##########
@@ -27,6 +27,7 @@ CarbonData DML statements are documented here,which includes:
 * [UPDATE AND DELETE](#update-and-delete)
 * [COMPACTION](#compaction)
 * [SEGMENT MANAGEMENT](./segment-management-on-carbondata.md)
+* [CLEAN FILES](./cleanfiles.md)

Review comment:
       change file name to clean-files.md

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {

Review comment:
       if it is unused, please remove it

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LOGGER.error("Failed to clear trash folder of all tables", e)
+    }
+  }
+  /**
+   * The below method deletes all the files and folders in trash folder in a carbontable and all
+   * it's index tables
+   */
+  def deleteDataFromTrashFolder(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
+    TrashUtil.deleteAllDataFromTrashFolder(carbonTable.getTablePath)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      TrashUtil.deleteAllDataFromTrashFolder(indexTable.getTablePath)
+    }
+  }
+
+  /**
+   * The below method deletes the timestamp sub directory in the trash folder based on the
+   * expiration day
+   */
+  def deleteDataFromTrashFolderByTimeStamp(carbonTable: CarbonTable, sparkSession:

Review comment:
       deleteStaleDataFromTrash

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LOGGER.error("Failed to clear trash folder of all tables", e)
+    }
+  }
+  /**
+   * The below method deletes all the files and folders in trash folder in a carbontable and all
+   * it's index tables
+   */
+  def deleteDataFromTrashFolder(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
+    TrashUtil.deleteAllDataFromTrashFolder(carbonTable.getTablePath)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      TrashUtil.deleteAllDataFromTrashFolder(indexTable.getTablePath)
+    }
+  }
+
+  /**
+   * The below method deletes the timestamp sub directory in the trash folder based on the
+   * expiration day
+   */
+  def deleteDataFromTrashFolderByTimeStamp(carbonTable: CarbonTable, sparkSession:
+  SparkSession): Unit = {
+    val expirationDay = CarbonProperties.getInstance().getTrashFolderExpirationTime
+    TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(carbonTable.getTablePath, expirationDay)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(indexTable.getTablePath, expirationDay)
+    }
+  }
+
+  /**
+   * Actual dry run operation. It will also do dry run for stale segments
+   */
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {

Review comment:
       dryRunCleanFiles

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {

Review comment:
       read tablestatus two times

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance.getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " + s"$tableUniqueName")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LOGGER.error("Failed to clear trash folder of all tables", e)
+    }
+  }
+  /**
+   * The below method deletes all the files and folders in trash folder in a carbontable and all
+   * it's index tables
+   */
+  def deleteDataFromTrashFolder(carbonTable: CarbonTable, sparkSession: SparkSession): Unit = {
+    TrashUtil.deleteAllDataFromTrashFolder(carbonTable.getTablePath)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      TrashUtil.deleteAllDataFromTrashFolder(indexTable.getTablePath)
+    }
+  }
+
+  /**
+   * The below method deletes the timestamp sub directory in the trash folder based on the
+   * expiration day
+   */
+  def deleteDataFromTrashFolderByTimeStamp(carbonTable: CarbonTable, sparkSession:
+  SparkSession): Unit = {
+    val expirationDay = CarbonProperties.getInstance().getTrashFolderExpirationTime
+    TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(carbonTable.getTablePath, expirationDay)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      TrashUtil.deleteAllDataFromTrashFolderByTimeStamp(indexTable.getTablePath, expirationDay)
+    }
+  }
+
+  /**
+   * Actual dry run operation. It will also do dry run for stale segments
+   */
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    var loadMetadataDetails = List[LoadMetadataDetails]()
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        loadMetadataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFilePath).toList
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+
+    // no need to show the first and the last segments here if they are already deleted but their
+    // entry is in the tablestatus
+    val finalsSegments = new ListBuffer[Row]
+    loadMetadataDetails.map(loadDetails =>
+      if ((loadDetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE || loadDetails
+        .getSegmentStatus == SegmentStatus.COMPACTED)) {
+        if (loadDetails.getVisibility == null || (loadDetails.getVisibility != null && loadDetails
+          .getVisibility.toBoolean)) {
+          finalsSegments += Row(carbonTable.getTableName, loadDetails.getLoadName,
+            loadDetails.getSegmentStatus.toString, "FACT", "DELETE", "NULL")
+        }
+      } else if (loadDetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS && (loadDetails
+        .getVisibility == null || (loadDetails.getVisibility != null && loadDetails.getVisibility
+        .toBoolean))) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loadDetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalsSegments += Row(carbonTable.getTableName, loadDetails.getLoadName,
+            loadDetails.getSegmentStatus.toString, "FACT", "DELETE", "NULL")
+        }
+      })
+    finalsSegments ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalsSegments
+  }
+
+  /**
+   * Dry run operation for carbonTable and all it's index tables
+   */
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    var res = cleanFilesDryRunOp(carbonTable, sparkSession)
+    res ++= trashFolderDryRun(carbonTable)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+        res ++= cleanFilesDryRunOp(indexTable, sparkSession)
+        res ++= trashFolderDryRun(indexTable)
+    }
+  res
+  }
+
+
+  /**
+   * Dry run operation for stale segments
+   */
+  def dryRunStaleSegments(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // stale segments are those segments whose entry is not present in the table status file, but
+    // those segments are in Fact folder and metadata folder
+
+    val metaDataLocation = carbonTable.getMetadataPath
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(partitionPath)) {
+      val allSegments = FileFactory.getCarbonFile(partitionPath).listFiles
+      // there is no segment
+      if (allSegments == null || allSegments.isEmpty) Seq.empty
+      // there is no segment or failed to read tablestatus file.
+      if (details == null || details.isEmpty) Seq.empty
+      val staleSegments = TableProcessingOperations.getStaleSegments(details, allSegments, false)
+      staleSegments.asScala.foreach(
+        staleSegmentName => finalSegments.add(Row(carbonTable.getTableName, staleSegmentName._2,
+          "UNKNOWN", "FACT", "MOVE TO TRASH"))
+      )
+    } else {
+      // for partition table flow
+      val partitionFolderList : java.util.ArrayList[String] = new util.ArrayList()
+      val allFolders = FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+      allFolders.toSeq.foreach( name =>
+        if (name.getName.contains(CarbonCommonConstants.EQUALS) ) {
+          partitionFolderList.add(name.getAbsolutePath)
+        })
+      val tableLoadDetails = details.map(detail => detail.getLoadName)
+      partitionFolderList.asScala.foreach(name =>
+        FileFactory.getCarbonFile(name).listFiles().foreach(fileName =>
+          if (fileName.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) && !tableLoadDetails
+            .contains(fileName.getName.substring(0, fileName.getName.indexOf(CarbonCommonConstants
+              .UNDERSCORE)))) {
+            val filePath = fileName.getAbsolutePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+            finalSegments.add(Row(carbonTable.getTableName, fileName.getName.substring(0, fileName
+              .getName.indexOf("_")), "UNKNOWN", filePath(filePath.length - 2), "MOVE TO TRASH"))
+          }
+        )
+      )
+    }
+    finalSegments.asScala
+  }
+
+  /**
+   * Dry run operation for trash folder
+   */
+  def trashFolderDryRun(carbonTable: CarbonTable): Seq[Row] = {

Review comment:
       will it return all content in trash?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -48,13 +54,38 @@ import org.apache.carbondata.view.MVManagerInSpark
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
+    options: Option[List[(String, String)]],
     forceTableClean: Boolean = false,
     isInternalCleanCall: Boolean = false,
     truncateTable: Boolean = false)
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
+  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
+  var isDryRun: Boolean = false
+  val dryRun = "isDryRun"
+  if (optionsMap.contains(dryRun.toLowerCase) ) {
+    isDryRun = Boolean.parseBoolean(optionsMap(dryRun.toLowerCase).toString)
+  }

Review comment:
       val isDryRun = optionsMap.getOrElse("isdryrun", "false").toBoolean

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -48,13 +54,38 @@ import org.apache.carbondata.view.MVManagerInSpark
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
+    options: Option[List[(String, String)]],
     forceTableClean: Boolean = false,
     isInternalCleanCall: Boolean = false,
     truncateTable: Boolean = false)
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
+  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
+  var isDryRun: Boolean = false
+  val dryRun = "isDryRun"
+  if (optionsMap.contains(dryRun.toLowerCase) ) {
+    isDryRun = Boolean.parseBoolean(optionsMap(dryRun.toLowerCase).toString)
+  }
+  var forceTrashClean: Boolean = false
+  if (optionsMap.contains("force") ) {
+    forceTrashClean = Boolean.parseBoolean(optionsMap("force").toString)
+  }

Review comment:
       simple the code

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +112,96 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
-    }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+    if (!isDryRun) {

Review comment:
       extract dryRun part to another method

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +112,96 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
-    }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent = CleanFilesPreEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
       }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
     } else {
-      cleanGarbageDataInAllTables(sparkSession)
+      Seq.empty
     }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
+  }
+
+  // This method deletes the stale segment files in the segment folder.
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {

Review comment:
       change the name

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +112,96 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
-    }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent = CleanFilesPreEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
       }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
     } else {
-      cleanGarbageDataInAllTables(sparkSession)
+      Seq.empty
     }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
+  }
+
+  // This method deletes the stale segment files in the segment folder.
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        val loadMetaDataDetails = SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).filter(details => details.getSegmentStatus ==

Review comment:
       please check how many times to read tablestatus

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,167 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);

Review comment:
       remove it

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,167 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);

Review comment:
       remove the code

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,167 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {

Review comment:
       use the default method without parameters

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Compare CarbonFile Timestamp and delete files.
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],

Review comment:
       CleanUtil contain same functional method also




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


1 ... 45678910