[GitHub] [carbondata] vikramahuja1001 opened a new pull request #3917: [WIP] clean files refactor

classic Classic list List threaded Threaded
184 messages Options
1 ... 345678910
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox

CarbonDataQA1 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-717190630


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/2948/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

akashrn5 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513171515



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2116,6 +2087,26 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the microseconds after which the trash folder will expire
+   */
+  public long getTrashFolderExpirationTime() {
+    String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS,
+            CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+    Integer result = 0;
+    try {
+      result = Integer.parseInt(configuredValue);
+      if (result < 0) {
+        LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value");
+        result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Error happened while parsing", e);

Review comment:
       ```suggestion
         LOGGER.error("Invalid value configured for " + "CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS, considering the default value", e);
   ```
   
   Please refactor it, in case of failure also it should take default value, now it will be considered as 0, and common message you can take to a variable to reduce the code and keep clean.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -113,12 +116,24 @@ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTab
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete, carbonTable
+              .getAbsoluteTableIdentifier())) {
         try {
+          // if insert in progress, then move it to trash
+          if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && !carbonTable
+              .isHivePartitionTable()) {
+            // move this segment to trash
+            TrashUtil.copyDataToTrashBySegment(FileFactory.getCarbonFile(CarbonTablePath
+                .getFactDir(carbonTable.getTablePath()) + "/Part0/Segment_" + oneLoad

Review comment:
       do not hardcode

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
##########
@@ -47,6 +47,7 @@
   public static final String BATCH_PREFIX = "_batchno";
   private static final String LOCK_DIR = "LockFiles";
 
+  public static final String SEGMENTS_METADATA_FOLDER = "segments";

Review comment:
       also change in `getSegmentFilesLocation` and `getSegmentFilePath` in this class and check for other place who uses `"segments"` string once

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1143,28 +1148,62 @@ public static void cleanSegments(CarbonTable table,
    * @throws IOException
    */
   public static void deleteSegment(String tablePath, Segment segment,
-      List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      List<PartitionSpec> partitionSpecs, SegmentUpdateStatusManager updateStatusManager,
+      SegmentStatus segmentStatus, Boolean isPartitionTable, String timeStampForTrashFolder)
+      throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
     List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
         FileFactory.getConfiguration());
+    List<String> filesToDelete = new ArrayList<>();
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
-      FileFactory.deleteFile(entry.getKey());
+      // Move the file to the trash folder in case the segment status is insert in progress
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS && isPartitionTable) {
+        TrashUtil.copyDataToTrashFolderByFile(tablePath, entry.getKey(), timeStampForTrashFolder +

Review comment:
       you are sending these many arguments in many places, can we refactor it to reduce the changes in many place?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {

Review comment:
       add a class level comment

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,

Review comment:
       same as above

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */

Review comment:
       please add variable names, javadoc

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */

Review comment:
       give the method comment, and variable comment, if not required, remove above doc

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}

Review comment:
       remove unused imports here, and please select all the text from line 45 to end of class, except 306 to 324 and then do the formatting at once, (ctrl+alt+shift+L)

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)

Review comment:
       move this line above

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {

Review comment:
       move this to above line

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)

Review comment:
       move line 123, 124 to just 122 line

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {
+      Seq.empty
     }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+  }
+
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        val loadMetaDataDetails = SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).filter(details => details.getSegmentStatus ==
+          SegmentStatus.SUCCESS || details.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+          .sortWith(_.getLoadName < _.getLoadName)
+        carbonLoadModel.setLoadMetadataDetails(loadMetaDataDetails.toList.asJava)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command")
       }
-    } else {
-      cleanGarbageDataInAllTables(sparkSession)
+    } finally {
+      tableStatusLock.unlock()
     }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
+    val loadMetaDataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+    val segmentFileList = loadMetaDataDetails.map(f => CarbonTablePath.getSegmentFilesLocation(
+      carbonTable.getTablePath) + CarbonCommonConstants.FILE_SEPARATOR + f.getSegmentFile)
+
+    val metaDataPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR + "segments"

Review comment:
       use the constant for `"segments"` which you have already defined

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {
+      Seq.empty
     }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+  }
+
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {

Review comment:
       give method level comment

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,171 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+

Review comment:
       remove empty line

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,19 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, index file, index merge file
+                  // or a delta file, copy that file to the trash folder.
+                  if ((eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) ||

Review comment:
       mention this in comment, why we need to do like this

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,171 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+

Review comment:
       remove empty lines

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)

Review comment:
       handle the comments on time and reply , make the conversation resolved once handled

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with In Progress segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    // run a select query before deletion
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val tableStatusFilePath = path + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tableStatus"
+    editTableStatusFile(path)
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    assert(FileFactory.isFileExist(trashFolderPath))
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 6)
+
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    // recovering data from trash folder
+    sql(
+      """
+        | CREATE TABLE cleantest1 (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST SELECT * from cleantest1""")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up maintable table and test trash folder with SI with IN PROGRESS segments") {
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE CLEANTEST_WITHSI (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 1,"abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 2, "abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 3, "abc","def"""")
+
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest_withSI(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(3)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest_withsi")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    val siTablePath = CarbonEnv.getCarbonTable(Some("default"), "si_cleantest")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(siTablePath)
+    val siTableTrashFolderPath = siTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(!FileFactory.isFileExist(siTableTrashFolderPath))
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest_withsi OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si table
+    assert(dryRun == 6)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST_WITHSI").show()
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(0)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(FileFactory.isFileExist(siTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 6)
+
+    count = 0
+    var listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest_withsi OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si table
+    assert(dryRun1 == 6)
+    // recovering data from trash folder
+
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+
+    sql(
+      """
+        | CREATE TABLE cleantest1 (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST_withsi SELECT * from cleantest1""")
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest_withsi options('force'='true')").show
+
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+
+    count = 0
+    listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 0)
+
+    sql("show segments for table cleantest_withsi").show()
+    sql("show segments for table si_cleantest").show()
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+  }
+
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 1 Marked Fro DElete segments to be deleted
+    assert(dryRun == 1)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 compacted segments to be deleted
+    assert(dryRun == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // First 3 segments are made as stale segments, they should be moved to the trash folder
+    createStaleSegments(path)
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+   // sql(s"""INSERT INTO CLEANTEST SELECT "abc", 3""")
+    assert(dryRun == 3)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    assert(dryRun1 == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with partition table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    editTableStatusFile(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun == 4)
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun1 == 4)
+
+    // try recovering data and do select count(*)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    // no carbondata file is in the trash now, everything has been deleted
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+
+  test("clean up table and test trash folder with stale segments in partition table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the trash folder
+    createStaleSegments(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun1 == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+

Review comment:
       why you have given two lines space, please check and avoid unnecessary empty lines




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513197605



##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1143,28 +1148,62 @@ public static void cleanSegments(CarbonTable table,
    * @throws IOException
    */
   public static void deleteSegment(String tablePath, Segment segment,
-      List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      List<PartitionSpec> partitionSpecs, SegmentUpdateStatusManager updateStatusManager,
+      SegmentStatus segmentStatus, Boolean isPartitionTable, String timeStampForTrashFolder)
+      throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
     List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
         FileFactory.getConfiguration());
+    List<String> filesToDelete = new ArrayList<>();
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
-      FileFactory.deleteFile(entry.getKey());
+      // Move the file to the trash folder in case the segment status is insert in progress
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS && isPartitionTable) {
+        TrashUtil.copyDataToTrashFolderByFile(tablePath, entry.getKey(), timeStampForTrashFolder +

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513198165



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2116,6 +2087,26 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the microseconds after which the trash folder will expire
+   */
+  public long getTrashFolderExpirationTime() {
+    String configuredValue = getProperty(CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS,
+            CarbonCommonConstants.CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+    Integer result = 0;
+    try {
+      result = Integer.parseInt(configuredValue);
+      if (result < 0) {
+        LOGGER.warn("Value of carbon.trash.expiration.days is negative, taking default value");
+        result = Integer.parseInt(CARBON_TRASH_EXPIRATION_DAYS_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Error happened while parsing", e);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513204008



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with In Progress segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    // run a select query before deletion
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val tableStatusFilePath = path + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tableStatus"
+    editTableStatusFile(path)
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    assert(FileFactory.isFileExist(trashFolderPath))
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 6)
+
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    // recovering data from trash folder
+    sql(
+      """
+        | CREATE TABLE cleantest1 (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST SELECT * from cleantest1""")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up maintable table and test trash folder with SI with IN PROGRESS segments") {
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE CLEANTEST_WITHSI (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 1,"abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 2, "abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 3, "abc","def"""")
+
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest_withSI(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(3)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest_withsi")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    val siTablePath = CarbonEnv.getCarbonTable(Some("default"), "si_cleantest")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(siTablePath)
+    val siTableTrashFolderPath = siTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(!FileFactory.isFileExist(siTableTrashFolderPath))
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest_withsi OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si table
+    assert(dryRun == 6)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST_WITHSI").show()
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(0)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(FileFactory.isFileExist(siTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 6)
+
+    count = 0
+    var listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest_withsi OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si table
+    assert(dryRun1 == 6)
+    // recovering data from trash folder
+
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+
+    sql(
+      """
+        | CREATE TABLE cleantest1 (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST_withsi SELECT * from cleantest1""")
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest_withsi options('force'='true')").show
+
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+
+    count = 0
+    listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 0)
+
+    sql("show segments for table cleantest_withsi").show()
+    sql("show segments for table si_cleantest").show()
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+  }
+
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 1 Marked Fro DElete segments to be deleted
+    assert(dryRun == 1)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 compacted segments to be deleted
+    assert(dryRun == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // First 3 segments are made as stale segments, they should be moved to the trash folder
+    createStaleSegments(path)
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+   // sql(s"""INSERT INTO CLEANTEST SELECT "abc", 3""")
+    assert(dryRun == 3)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    assert(dryRun1 == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with partition table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    editTableStatusFile(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun == 4)
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun1 == 4)
+
+    // try recovering data and do select count(*)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    // no carbondata file is in the trash now, everything has been deleted
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+
+  test("clean up table and test trash folder with stale segments in partition table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the trash folder
+    createStaleSegments(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun1 == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513205026



##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,171 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+
+      int retryCount = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+      int maxTimeout = CarbonLockUtil
+          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
+
+      try {
+        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
+

Review comment:
       done

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -149,9 +132,171 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       } finally {
         carbonTableStatusLock.unlock();
       }
+    } else {
+

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513205119



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {
+      Seq.empty
     }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+  }
+
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        val loadMetaDataDetails = SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).filter(details => details.getSegmentStatus ==
+          SegmentStatus.SUCCESS || details.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+          .sortWith(_.getLoadName < _.getLoadName)
+        carbonLoadModel.setLoadMetadataDetails(loadMetaDataDetails.toList.asJava)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command")
       }
-    } else {
-      cleanGarbageDataInAllTables(sparkSession)
+    } finally {
+      tableStatusLock.unlock()
     }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
+    val loadMetaDataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+    val segmentFileList = loadMetaDataDetails.map(f => CarbonTablePath.getSegmentFilesLocation(
+      carbonTable.getTablePath) + CarbonCommonConstants.FILE_SEPARATOR + f.getSegmentFile)
+
+    val metaDataPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR + "segments"

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513209768



##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
##########
@@ -47,6 +47,7 @@
   public static final String BATCH_PREFIX = "_batchno";
   private static final String LOCK_DIR = "LockFiles";
 
+  public static final String SEGMENTS_METADATA_FOLDER = "segments";

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513214833



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {
+      Seq.empty
     }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+  }
+
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513215028



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513215239



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +113,98 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent =
+        CleanFilesPreEvent(carbonTable,
+          sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
+      } else {
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
+      }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
+    }
+    else {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513217211



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513217586



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513218983



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.sql.Timestamp
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513220391



##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513225060



##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolderPath(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String carbonTablePath,

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that the proper folder is
+   * created in the trash folder.
+   */

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513229197



##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -113,12 +116,24 @@ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTab
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete, carbonTable
+              .getAbsoluteTableIdentifier())) {
         try {
+          // if insert in progress, then move it to trash
+          if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && !carbonTable
+              .isHivePartitionTable()) {
+            // move this segment to trash
+            TrashUtil.copyDataToTrashBySegment(FileFactory.getCarbonFile(CarbonTablePath
+                .getFactDir(carbonTable.getTablePath()) + "/Part0/Segment_" + oneLoad

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513243540



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513252427



##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,19 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, index file, index merge file
+                  // or a delta file, copy that file to the trash folder.
+                  if ((eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) ||

Review comment:
       changed, it's not needed anymore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA1 commented on pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA1 commented on pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#issuecomment-717879695


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4717/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


1 ... 345678910