[GitHub] [carbondata] vikramahuja1001 opened a new pull request #4005: [WIP] Trash Folder support in carbondata

classic Classic list List threaded Threaded
147 messages Options
12345678
Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r531550965



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createParitionTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+  test("test trash folder with 2 segments with same segment number") {
+    createParitionTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS C1""")
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the trash folder
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment3Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_3"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments part 2") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS C1""")
+
+    sql("create table cleantest(" +
+      "value int) partitioned by (name string, age int) stored as carbondata")
+    sql("insert into cleantest values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into cleantest values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into cleantest values (30, 'cat', 12), (40, 'dog', 13)")
+
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the trash folder
+    // createStaleSegments(path)
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(6)))
+    checkAnswer(sql(s"""select count(*) from cleantest where age=13"""),
+      Seq(Row(3)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale segments") {
+    createParitionTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(name) as 'carbondata' """)
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonTablePath.TRASH_DIR
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("show segments for table cleantest").show()
+    sql("show segments for table si_cleantest").show()
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File
+    val w = new PrintWriter(f2)
+    Source.fromFile(f1).getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    w.close()
+    f2.renameTo(f1)
+  }
+
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {

Review comment:
       i don't understand, what do i have to do here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r531551083



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") {
+    // do not send MFD folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""alter table cleantest compact 'minor'""")
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // recovering data from trash folder
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + "0.1"
+    val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '4'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = CarbonTablePath.getTrashFolderPath(mainTablePath)
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the trash folder
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File
+    val w = new PrintWriter(f2)
+    Source.fromFile(f1).getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    w.close()
+    f2.renameTo(f1)
+  }
+
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {

Review comment:
       i don't understand, what do i have to do here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r531552014



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -91,6 +96,14 @@ case class CarbonCleanFilesCommand(
     OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
     if (tableName.isDefined) {
       Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+      if (forceClean) {
+        // empty the trash folder
+        TrashUtil.emptyTrash(carbonTable.getTablePath)
+      } else {
+        // clear trash based on timestamp
+        TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+      }
+      CleanFilesUtil.cleanStaleSegments(carbonTable)

Review comment:
       i don't understand , what you mean here. We are cleaning the stale segments here for the first time. We delete the data in trash first based on timestamp expiration and then proceed with cleaning of stale segments and then do regular clean files operation for MFD/Compacted segments




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-734836172


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/4957/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

CarbonDataQA2 commented on pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#issuecomment-734838466


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3202/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532004928



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2086,6 +2087,41 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the time(in milliseconds) for which timestamp folder retention in
+   * trash folder will take place.
+   */
+  public long getTrashFolderRetentionTime() {
+    String propertyValue = getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer
+        .toString(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    int configuredValue = 0;
+    try {
+      configuredValue = Integer.parseInt(propertyValue);
+      if (configuredValue < 0 || configuredValue > CarbonCommonConstants
+          .CARBON_TRASH_RETENTION_DAYS_MAXIMUM) {
+        LOGGER.warn("Value of " + CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS + " is" +
+            " invalid, taking default value instead");
+        configuredValue = CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT;
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Invalid value configured for " + CarbonCommonConstants
+          .CARBON_TRASH_RETENTION_DAYS + ", considering the default value");
+      configuredValue = CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT;
+    }
+    long milliSecondsInADay = TimeUnit.DAYS.toMillis(1);
+    return (long) configuredValue * milliSecondsInADay;
+  }

Review comment:
       please move validate logical to validateTrashFolderRetentionTime method, getTrashFolderRetentionTime should return the value and not write any log.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.

Review comment:
       This util provide clean stale data methods  for clean files command

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {

Review comment:
             for (String staleSegmentFile : staleSegmentFiles) {
   

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +

Review comment:
       CarbonFile segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentNo)

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());

Review comment:
       file.delete()
   
   please optimize your code to avoid using deleteEmptyPartitionFolders as much as possible

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {

Review comment:
       getStaleSegments => getStaleSegmentFiles

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable

Review comment:
       Agree with Akash
   collect all file paths at first, not carbon file.
   when using it, create carbon file one by one.
   

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      // in case there is any issue while copying the file to the trash folder, we need to delete
+      // the complete segment folder from the trash folder. The trashFolderWithTimestamp contains
+      // the segment folder too. Delete the folder as it is.
+      FileFactory.deleteFile(trashFolderWithTimestamp);
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      List<CarbonFile> dataFiles = FileFactory.getFolderList(segmentPath.getAbsolutePath());
+      for (CarbonFile carbonFile : dataFiles) {
+        copyFileToTrashFolder(carbonFile.getAbsolutePath(), trashFolderWithTimestamp);
+      }
+      LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been copied to" +
+          " the trash folder successfully");
+    } catch (IOException e) {
+      LOGGER.error("Error while getting folder list for the segment", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined retention time
+   */
+  public static void deleteExpiredDataFromTrash(String tablePath) {
+    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    try {
+      if (FileFactory.isFileExist(trashPath)) {
+        List<CarbonFile> timestampFolderList = FileFactory.getFolderList(trashPath);
+        for (CarbonFile timestampFolder : timestampFolderList) {
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+            if (timestampFolder.isFileExist()) {

Review comment:
       no need check after list file

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();

Review comment:
       no need clear, right?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];

Review comment:
       String segmentNo = DataFileUtil.getSegmentNoFromSegmentFile(staleSegmentFile)
   
   change all places

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(

Review comment:
       please extract the second parameter to a method

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);

Review comment:
       if you ask the user to do it manually, better to print the path of the segment

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);

Review comment:
       change staleSegments to staleSegmentFiles

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)

Review comment:
       please optimize code like last method

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {

Review comment:
       remove this line 55

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file

Review comment:
       change invoke one time for one partition folder

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments contains segments files.
+    // Segment number from those segment files is extracted and Stale segement file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    Arrays.stream(details).map(loadMetadataDetails -> loadMetadataDetails.getLoadName())
+        .forEach(loadName -> loadNameSet.add(loadName));
+    for (CarbonFile segmentFile : segmentFilesList) {
+      if (!loadNameSet.contains(segmentFile.getName().split(CarbonCommonConstants
+          .UNDERSCORE)[0])) {
+        staleSegmentList.add(segmentFile.getName());
+      }
+    }
+    loadNameSet.clear();

Review comment:
       remove it

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable

Review comment:
       the second parameter should reuse a method

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),

Review comment:
       after deleteFoldersAndFiles failed and you catch the excpetion,
   here you remove segment_file, in the future, this segment can't be cleaned again.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments contains segments files.
+    // Segment number from those segment files is extracted and Stale segement file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    Arrays.stream(details).map(loadMetadataDetails -> loadMetadataDetails.getLoadName())
+        .forEach(loadName -> loadNameSet.add(loadName));
+    for (CarbonFile segmentFile : segmentFilesList) {
+      if (!loadNameSet.contains(segmentFile.getName().split(CarbonCommonConstants
+          .UNDERSCORE)[0])) {
+        staleSegmentList.add(segmentFile.getName());

Review comment:
       if a segment has multiple segment files, how to processs it?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }

Review comment:
       deduplicate code

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments contains segments files.
+    // Segment number from those segment files is extracted and Stale segement file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    Arrays.stream(details).map(loadMetadataDetails -> loadMetadataDetails.getLoadName())
+        .forEach(loadName -> loadNameSet.add(loadName));

Review comment:
       use map to collect a set

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {

Review comment:
       if a segment has 100 files, here will check 100 times, can we change it to one time?
   
   please implement it in copySegmentToTrash

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      // in case there is any issue while copying the file to the trash folder, we need to delete
+      // the complete segment folder from the trash folder. The trashFolderWithTimestamp contains
+      // the segment folder too. Delete the folder as it is.
+      FileFactory.deleteFile(trashFolderWithTimestamp);
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      List<CarbonFile> dataFiles = FileFactory.getFolderList(segmentPath.getAbsolutePath());

Review comment:
       FileFactory.getFolderList only return folder list, and will not include files
   
   LocalCarbonFile provide a wrong implement




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] QiangCai commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

QiangCai commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532010590



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());

Review comment:
       file.delete()
   
   please optimize your code to avoid using getAbsolutePath as much as possible




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532357687



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532357801



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();

Review comment:
       yes, can be removed

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532357875



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532359426



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532360958



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532363133



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532364095



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments contains segments files.
+    // Segment number from those segment files is extracted and Stale segement file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    Arrays.stream(details).map(loadMetadataDetails -> loadMetadataDetails.getLoadName())
+        .forEach(loadName -> loadNameSet.add(loadName));
+    for (CarbonFile segmentFile : segmentFilesList) {
+      if (!loadNameSet.contains(segmentFile.getName().split(CarbonCommonConstants
+          .UNDERSCORE)[0])) {
+        staleSegmentList.add(segmentFile.getName());
+      }
+    }
+    loadNameSet.clear();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532364299



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532370448



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532395502



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532396184



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      // in case there is any issue while copying the file to the trash folder, we need to delete
+      // the complete segment folder from the trash folder. The trashFolderWithTimestamp contains
+      // the segment folder too. Delete the folder as it is.
+      FileFactory.deleteFile(trashFolderWithTimestamp);
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      List<CarbonFile> dataFiles = FileFactory.getFolderList(segmentPath.getAbsolutePath());

Review comment:
       changed to listfiles




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532396388



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath the path from which to copy the file
+   * @param destinationPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String sourcePath, String destinationPath)
+      throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy the files which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp    timestamp, partition folder(if any) and segment number
+   * @return
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    try {
+      if (carbonFileToCopy.exists()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp + CarbonCommonConstants
+            .FILE_SEPARATOR + carbonFileToCopy.getName())) {
+          copyToTrashFolder(filePathToCopy, trashFolderWithTimestamp + CarbonCommonConstants
+              .FILE_SEPARATOR + carbonFileToCopy.getName());
+        }
+      }
+    } catch (IOException e) {
+      // in case there is any issue while copying the file to the trash folder, we need to delete
+      // the complete segment folder from the trash folder. The trashFolderWithTimestamp contains
+      // the segment folder too. Delete the folder as it is.
+      FileFactory.deleteFile(trashFolderWithTimestamp);
+      LOGGER.error("Error while creating trash folder or copying data to the trash folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath the folder which are to be moved to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   * @return
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      List<CarbonFile> dataFiles = FileFactory.getFolderList(segmentPath.getAbsolutePath());
+      for (CarbonFile carbonFile : dataFiles) {
+        copyFileToTrashFolder(carbonFile.getAbsolutePath(), trashFolderWithTimestamp);
+      }
+      LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been copied to" +
+          " the trash folder successfully");
+    } catch (IOException e) {
+      LOGGER.error("Error while getting folder list for the segment", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder which have expired as
+   * per the user defined retention time
+   */
+  public static void deleteExpiredDataFromTrash(String tablePath) {
+    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
+    try {
+      if (FileFactory.isFileExist(trashPath)) {
+        List<CarbonFile> timestampFolderList = FileFactory.getFolderList(trashPath);
+        for (CarbonFile timestampFolder : timestampFolderList) {
+          // If the timeStamp at which the timeStamp subdirectory has expired as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+            if (timestampFolder.isFileExist()) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4005: [CARBONDATA-3978] Trash Folder support in carbondata

GitBox
In reply to this post by GitBox

vikramahuja1001 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r532396566



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the source folder after
+   * copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " from after moving" +
+                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete the source folders
+   * after copying the data to the trash and also remove the .segment files of the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String indexfile : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(indexfile, CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(indexfile));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // After every file of that segment has been copied, need to delete those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files. Please delete the files of" +
+              " segment: " + segmentNumber + " manually.", e);
+        }
+        // Delete the segment file too
+        FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegment));
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment files in the
+   * metadata directory with the segments in the table status file. Any segment which has entry
+   * in the metadata folder and is not present in the table status file is considered as a
+   * stale segment. Only comparing from tablestatus file, not checking tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments contains segments files.
+    // Segment number from those segment files is extracted and Stale segement file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    Arrays.stream(details).map(loadMetadataDetails -> loadMetadataDetails.getLoadName())
+        .forEach(loadName -> loadNameSet.add(loadName));

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


12345678