Posted by
GitBox on
Feb 17, 2021; 10:46am
URL: http://apache-carbondata-dev-mailing-list-archive.168.s1.nabble.com/GitHub-carbondata-vikramahuja1001-opened-a-new-pull-request-4072-WIP-Clean-files-phase2-tp105322p106258.html
ajantha-bhat commented on a change in pull request #4072:
URL:
https://github.com/apache/carbondata/pull/4072#discussion_r577500588##########
File path: integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
##########
@@ -26,5 +26,6 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSessi
case class CleanFilesPostEvent(
carbonTable: CarbonTable,
sparkSession: SparkSession,
- options: Map[String, String])
+ options: Map[String, String],
+ dryRun: Boolean)
Review comment:
why not sending this in options itself ?
##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -112,13 +141,91 @@ object DataTrashManager {
carbonTable: CarbonTable,
isForceDelete: Boolean,
cleanStaleInProgress: Boolean,
- partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+ partitionSpecsOption: Option[Seq[PartitionSpec]]): Seq[Long] = {
val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+ val sizeStatistics = SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
isForceDelete, partitionSpecs, cleanStaleInProgress, true)
if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
}
+ sizeStatistics
+ }
+
+ private def dryRunOnExpiredSegments(
+ carbonTable: CarbonTable,
+ isForceDelete: Boolean,
+ cleanStaleInProgress: Boolean,
+ partitionSpecsOption: Option[Seq[PartitionSpec]]): Seq[Long] = {
+ var sizeFreed: Long = 0
+ var trashSizeRemaining: Long = 0
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+ loadMetadataDetails.foreach { oneLoad =>
+ val segmentFilePath = CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+ oneLoad.getSegmentFile)
+ if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
+ // No need to consider physical data for external segments, only consider metadata.
+ if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
+ if (!carbonTable.isHivePartitionTable) {
+ sizeFreed += FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(carbonTable
+ .getTablePath, oneLoad.getLoadName))
+ } else {
+ sizeFreed += partitionTableSegmentSize(carbonTable, oneLoad, loadMetadataDetails,
+ partitionSpecsOption)
+ }
+ }
+ sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+ } else {
+ if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+ .getAbsoluteTableIdentifier)) {
+ if (!carbonTable.isHivePartitionTable) {
+ trashSizeRemaining += FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(
Review comment:
I see that size calculation code is duplicate in dryrun flow and in clean up flow, can we extract a common method and use it ?
##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -112,13 +141,91 @@ object DataTrashManager {
carbonTable: CarbonTable,
isForceDelete: Boolean,
cleanStaleInProgress: Boolean,
- partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+ partitionSpecsOption: Option[Seq[PartitionSpec]]): Seq[Long] = {
val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+ val sizeStatistics = SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
isForceDelete, partitionSpecs, cleanStaleInProgress, true)
if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
}
+ sizeStatistics
+ }
+
+ private def dryRunOnExpiredSegments(
+ carbonTable: CarbonTable,
+ isForceDelete: Boolean,
+ cleanStaleInProgress: Boolean,
+ partitionSpecsOption: Option[Seq[PartitionSpec]]): Seq[Long] = {
+ var sizeFreed: Long = 0
+ var trashSizeRemaining: Long = 0
+ val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+ loadMetadataDetails.foreach { oneLoad =>
+ val segmentFilePath = CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+ oneLoad.getSegmentFile)
+ if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
+ // No need to consider physical data for external segments, only consider metadata.
+ if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
+ if (!carbonTable.isHivePartitionTable) {
+ sizeFreed += FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(carbonTable
+ .getTablePath, oneLoad.getLoadName))
+ } else {
+ sizeFreed += partitionTableSegmentSize(carbonTable, oneLoad, loadMetadataDetails,
+ partitionSpecsOption)
+ }
+ }
+ sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+ } else {
+ if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+ .getAbsoluteTableIdentifier)) {
+ if (!carbonTable.isHivePartitionTable) {
+ trashSizeRemaining += FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(
+ carbonTable.getTablePath, oneLoad.getLoadName))
+ } else {
+ trashSizeRemaining += partitionTableSegmentSize(carbonTable, oneLoad,
+ loadMetadataDetails, partitionSpecsOption)
+ }
+ trashSizeRemaining += FileFactory.getCarbonFile(segmentFilePath).getSize
+ }
+ }
+ }
+ }
+ Seq(sizeFreed, trashSizeRemaining)
+ }
+
+ def partitionTableSegmentSize( carbonTable: CarbonTable, oneLoad: LoadMetadataDetails,
+ loadMetadataDetails: Array[LoadMetadataDetails], partitionSpecsOption:
+ Option[Seq[PartitionSpec]]) : Long = {
+ var segmentSize: Long = 0
Review comment:
I see that size calculation code is duplicate in dryrun flow and in clean up flow, can we extract a common method and use it ?
##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -186,7 +193,9 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
sqlContext.sparkSession), "2")
- sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+ val df1 = sql(s"CLEAN FILES FOR TABLE CLEANTEST DRYRUN")
Review comment:
please verify (at least manully) that dry run size and without dry run, it shows the correct size estimation (if somewhere we are adding from two places, it might give incorrect results)
Also verify the summary is same as actual size cleaned up with big table in backend for both partition and non-partition table.
##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
##########
@@ -513,12 +513,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val cleanFiles: Parser[LogicalPlan] =
CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
- (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
- case databaseName ~ tableName ~ optionList =>
+ (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? ~ opt(DRYRUN) <~ opt(";") ^^ {
Review comment:
In the current clean files parser itself would have added the option of dry run (similar to force delete option), why adding a new syntax in the end ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]